diff --git a/cli/src/bin/invoices.rs b/cli/src/bin/invoices.rs index 2ba928e..95c05af 100644 --- a/cli/src/bin/invoices.rs +++ b/cli/src/bin/invoices.rs @@ -38,7 +38,7 @@ impl InvoiceResponseListener for LogInvoiceResponseListener { async fn main() -> Result<(), CliError> { env_logger::init(); - let relays = vec!["wss://relay.nostr.net".to_string()]; + let relays = vec!["wss://miodominio.net".to_string()]; let (receiver_key, receiver) = create_app_instance( "Receiver", @@ -89,6 +89,6 @@ async fn main() -> Result<(), CliError> { log::info!("Apps created"); - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + tokio::time::sleep(std::time::Duration::from_secs(60)).await; Ok(()) } diff --git a/cli/src/bin/payment_request.rs b/cli/src/bin/payment_request.rs new file mode 100644 index 0000000..f50765d --- /dev/null +++ b/cli/src/bin/payment_request.rs @@ -0,0 +1,95 @@ +use std::{sync::Arc, time::Duration as StdDuration}; + +use app::{CallbackError, CashuRequestListener, PaymentRequestListener, PaymentStatusNotifier, RecurringPaymentRequest, SinglePaymentRequest}; +use cli::{CliError, create_app_instance, create_sdk_instance}; +use portal::protocol::model::{ + payment::{Currency, RecurringPaymentResponseContent, RecurringPaymentStatus, SinglePaymentRequestContent}, Timestamp +}; + +struct LogPaymentRequestListener; + +#[async_trait::async_trait] +impl PaymentRequestListener for LogPaymentRequestListener { + async fn on_single_payment_request( + &self, + event: SinglePaymentRequest, + notifier: Arc, + ) -> Result<(), CallbackError> { + log::info!("Receiver: Received Payment request: {:?}", event); + // Always approve for test + Ok(()) + } + async fn on_recurring_payment_request( + &self, + event: RecurringPaymentRequest, + ) -> Result { + log::info!("Receiver: Received Recurring Payment request: {:?}", event); + Ok(RecurringPaymentResponseContent { + request_id: event.content.request_id, + status: RecurringPaymentStatus::Rejected { reason: Some("User rejected".to_string()) }, + }) + } +} + +#[tokio::main] +async fn main() -> Result<(), CliError> { + env_logger::init(); + + let relays = vec!["wss://relay.nostr.net".to_string()]; + + let (receiver_key, receiver) = create_app_instance( + "Receiver", + "mass derive myself benefit shed true girl orange family spawn device theme", + relays.clone(), + ) + .await?; + let _receiver = receiver.clone(); + + tokio::spawn(async move { + log::info!("Receiver: Setting up Payment request listener"); + _receiver + .listen_for_payment_request(Arc::new(LogPaymentRequestListener)) + .await + .expect("Receiver: Error creating listener"); + }); + + let sender_sdk = create_sdk_instance( + "draft sunny old taxi chimney ski tilt suffer subway bundle once story", + relays.clone(), + ) + .await?; + + log::info!("Apps created, waiting 5 seconds before sending request"); + tokio::time::sleep(StdDuration::from_secs(5)).await; + + let request_content = SinglePaymentRequestContent { + amount: 1000, + currency: Currency::Millisats, + current_exchange_rate: None, + invoice: "invoice".to_string(), + auth_token: None, + expires_at: Timestamp::now_plus_seconds(300), + subscription_id: None, + description: Some("test".to_string()), + request_id: "test".to_string(), + }; + + let mut response = sender_sdk + .request_single_payment(receiver_key.public_key().0, vec![], request_content) + .await + .unwrap(); + + while let Some(resp) = response.next().await { + + match resp { + Ok(resp) => { + log::info!("Sender: Received payment: {:?}", resp); + } + Err(e) => { + log::error!("Sender: Error receiving payment: {}", e); + } + } + } + + Ok(()) +} diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 198b616..3b13661 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -35,10 +35,7 @@ pub async fn create_app_instance( let app = PortalApp::new( keypair.clone(), - vec![ - "wss://relay.nostr.net".to_string(), - "wss://relay.damus.io".to_string(), - ], + _relays, Arc::new(LogRelayStatusChange), ) .await?; diff --git a/src/router/actor.rs b/src/router/actor.rs index 4c8b74d..09b60fc 100644 --- a/src/router/actor.rs +++ b/src/router/actor.rs @@ -1,13 +1,12 @@ +use core::f64; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; +// use std::time::Duration inside async blocks use nostr::{ - event::{Event, EventBuilder, Kind}, - filter::{Filter, MatchEventOptions}, - message::{RelayMessage, SubscriptionId}, - nips::nip44, + event::{Event, EventBuilder, Kind}, filter::{Filter, MatchEventOptions}, message::{RelayMessage, SubscriptionId}, nips::nip44, types::RelayUrl }; use nostr_relay_pool::RelayPoolNotification; use serde::{Serialize, de::DeserializeOwned}; @@ -15,10 +14,9 @@ use tokio::sync::{mpsc, oneshot}; use tokio_stream::StreamExt; use crate::{ - protocol::{LocalKeypair, model::event_kinds::SUBKEY_PROOF}, + protocol::{model::event_kinds::SUBKEY_PROOF, LocalKeypair}, router::{ - CleartextEvent, Conversation, ConversationError, ConversationMessage, NotificationStream, - PortalId, Response, channel::Channel, + channel::{BroadcastResult, Channel}, CleartextEvent, Conversation, ConversationError, ConversationMessage, NotificationStream, PortalId, Response }, }; @@ -857,12 +855,12 @@ impl MessageRouterActorState { // check if Response has selected relays if let Some(selected_relays) = selected_relays_optional { for event in events_to_broadcast { - self.queue_event(channel, event, Some(selected_relays.clone())) + self.queue_event(channel, QueuedEvent::new_with_relays(event, selected_relays.clone())) .await?; } } else { for event in events_to_broadcast { - self.queue_event(channel, event, None).await?; + self.queue_event(channel, QueuedEvent::new(event)).await?; } // TODO: wait for confirmation from relays @@ -879,25 +877,52 @@ impl MessageRouterActorState { async fn queue_event( &self, channel: &Arc, - event: Event, - relays: Option>, + queue_event: QueuedEvent, ) -> Result<(), ConversationError> where C::Error: From, { - if let Some(relays) = relays { - // if selected relays, broadcast to selected relays - channel - .broadcast_to(relays, event) - .await - .map_err(|e| ConversationError::Inner(Box::new(e)))?; - } else { - // if not selected relays, broadcast to all relays - channel - .broadcast(event) - .await - .map_err(|e| ConversationError::Inner(Box::new(e)))?; + let result = queue_event.broadcast(channel).await?; + if !result.failed().is_empty() { + // incrementally delay until it is confirmed + log::warn!("New event with id {} is queued, delaying until it is confirmed", queue_event.event.id); + + let channel = channel.clone(); + let event = queue_event.clone(); + + tokio::spawn(async move { + let mut attempt = 0.0; + let base_delay = 1.0; // seconds + let multiplier = f64::consts::E; + let mut remaining_relays = result; + while !remaining_relays.failed().is_empty() { + attempt += 1.0; + + log::warn!("Attempt {} of event {} on relays {:?}", attempt, event.event.id, remaining_relays.failed()); + let mut secs = base_delay * multiplier.powf(attempt); + if secs > 300.0 { + secs = 300.0; + } + tokio::time::sleep(std::time::Duration::from_secs_f64(secs)).await; + match event.broadcast_remaining(&channel, &remaining_relays).await { + Ok(result) => { + remaining_relays = result; + }, + Err(_) => { + log::error!("Event {} failed to broadcast on relays {:?}", event.event.id, remaining_relays.failed()); + }, + } + + if attempt > 25.0 { + log::error!("Event {} failed to broadcast after 25 attempts", event.event.id); + break; + } + + } + }); + } + Ok(()) } @@ -1226,3 +1251,43 @@ impl std::fmt::Debug for ConversationBox { f.debug_struct("Conversation").finish() } } + + +#[derive(Clone)] +pub struct QueuedEvent { + event: Event, + relays: Option>, +} + +impl QueuedEvent { + pub fn new(event: Event ) -> Self { + Self { event, relays: None } + } + pub fn new_with_relays(event: Event, relays: HashSet) -> Self { + Self { event, relays: Some(relays) } + } + + pub async fn broadcast(&self, channel: &Arc) + -> Result + where + C::Error: From, + { + let relays = self.relays.clone(); + + let result = if let Some(relays) = relays { + channel.broadcast_to(relays, &self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))? + } else { + channel.broadcast(&self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))? + }; + Ok(result) + } + + pub async fn broadcast_remaining(&self, channel: &Arc, relays: &BroadcastResult) -> Result + where + C::Error: From, + { + let result = channel.broadcast_to(relays.failed().clone(), &self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?; + Ok(result) + } + +} \ No newline at end of file diff --git a/src/router/channel.rs b/src/router/channel.rs index cbdfd76..0765426 100644 --- a/src/router/channel.rs +++ b/src/router/channel.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use nostr::{message::SubscriptionId, types::TryIntoUrl}; use nostr_relay_pool::{ RelayPool, RelayPoolNotification, SubscribeOptions, @@ -9,7 +11,7 @@ use crate::router::ids::PortalId; /// A trait for an abstract channel /// /// This is modeled around Nostr relays, in which we can subscribe to events matching a filter. -pub trait Channel: Send + 'static { +pub trait Channel: Send + Sync + 'static { type Error: std::error::Error + Send + Sync + 'static; fn subscribe( @@ -37,13 +39,13 @@ pub trait Channel: Send + 'static { fn broadcast( &self, - event: nostr::Event, - ) -> impl std::future::Future> + Send; + event: &nostr::Event, + ) -> impl std::future::Future> + Send; fn broadcast_to( &self, urls: I, - event: nostr::Event, - ) -> impl std::future::Future> + Send + event: &nostr::Event, + ) -> impl std::future::Future> + Send where ::IntoIter: Send, I: IntoIterator + Send, @@ -57,6 +59,23 @@ pub trait Channel: Send + 'static { fn shutdown(&self) -> impl std::future::Future> + Send; } +/// A result of a Nostr broadcast +#[derive(Debug, Clone)] +pub struct BroadcastResult { + failed: HashSet, +} + +impl BroadcastResult { + pub fn new(failed: HashSet) -> Self { + Self { failed } + } + + /// Get the failed relays + pub fn failed(&self) -> &HashSet { + &self.failed + } +} + impl Channel for RelayPool { type Error = nostr_relay_pool::pool::Error; @@ -113,19 +132,20 @@ impl Channel for RelayPool { Ok(()) } - async fn broadcast(&self, event: nostr::Event) -> Result<(), Self::Error> { - self.send_event(&event).await?; - Ok(()) + async fn broadcast(&self, event: &nostr::Event) -> Result { + let output = self.send_event(event).await?; + return Ok(BroadcastResult::new(output.failed.keys().map(|url| url.to_string()).collect())); } - async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result<(), Self::Error> + async fn broadcast_to(&self, urls: I, event: &nostr::Event) -> Result where ::IntoIter: Send, I: IntoIterator + Send, U: TryIntoUrl, Self::Error: From<::Err>, { - self.send_event_to(urls, &event).await?; - Ok(()) + let output = self.send_event_to(urls, event).await?; + + return Ok(BroadcastResult::new(output.failed.keys().map(|url| url.to_string()).collect())); } async fn receive(&self) -> Result { @@ -167,11 +187,11 @@ impl Channel for std::sync::Arc { ::unsubscribe(self, id).await } - async fn broadcast(&self, event: nostr::Event) -> Result<(), Self::Error> { + async fn broadcast(&self, event: &nostr::Event) -> Result { ::broadcast(self, event).await } - async fn broadcast_to(&self, urls: I, event: nostr::Event) -> Result<(), Self::Error> + async fn broadcast_to(&self, urls: I, event: &nostr::Event) -> Result where ::IntoIter: Send, I: IntoIterator + Send,