Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/src/bin/invoices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl InvoiceResponseListener for LogInvoiceResponseListener {
async fn main() -> Result<(), CliError> {
env_logger::init();

let relays = vec!["wss://relay.nostr.net".to_string()];
let relays = vec!["wss://miodominio.net".to_string()];

let (receiver_key, receiver) = create_app_instance(
"Receiver",
Expand Down Expand Up @@ -89,6 +89,6 @@ async fn main() -> Result<(), CliError> {

log::info!("Apps created");

tokio::time::sleep(std::time::Duration::from_secs(10)).await;
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
Ok(())
}
95 changes: 95 additions & 0 deletions cli/src/bin/payment_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::{sync::Arc, time::Duration as StdDuration};

use app::{CallbackError, CashuRequestListener, PaymentRequestListener, PaymentStatusNotifier, RecurringPaymentRequest, SinglePaymentRequest};
use cli::{CliError, create_app_instance, create_sdk_instance};
use portal::protocol::model::{
payment::{Currency, RecurringPaymentResponseContent, RecurringPaymentStatus, SinglePaymentRequestContent}, Timestamp
};

struct LogPaymentRequestListener;

#[async_trait::async_trait]
impl PaymentRequestListener for LogPaymentRequestListener {
async fn on_single_payment_request(
&self,
event: SinglePaymentRequest,
notifier: Arc<dyn PaymentStatusNotifier>,
) -> Result<(), CallbackError> {
log::info!("Receiver: Received Payment request: {:?}", event);
// Always approve for test
Ok(())
}
async fn on_recurring_payment_request(
&self,
event: RecurringPaymentRequest,
) -> Result<RecurringPaymentResponseContent, CallbackError> {
log::info!("Receiver: Received Recurring Payment request: {:?}", event);
Ok(RecurringPaymentResponseContent {
request_id: event.content.request_id,
status: RecurringPaymentStatus::Rejected { reason: Some("User rejected".to_string()) },
})
}
}

#[tokio::main]
async fn main() -> Result<(), CliError> {
env_logger::init();

let relays = vec!["wss://relay.nostr.net".to_string()];

let (receiver_key, receiver) = create_app_instance(
"Receiver",
"mass derive myself benefit shed true girl orange family spawn device theme",
relays.clone(),
)
.await?;
let _receiver = receiver.clone();

tokio::spawn(async move {
log::info!("Receiver: Setting up Payment request listener");
_receiver
.listen_for_payment_request(Arc::new(LogPaymentRequestListener))
.await
.expect("Receiver: Error creating listener");
});

let sender_sdk = create_sdk_instance(
"draft sunny old taxi chimney ski tilt suffer subway bundle once story",
relays.clone(),
)
.await?;

log::info!("Apps created, waiting 5 seconds before sending request");
tokio::time::sleep(StdDuration::from_secs(5)).await;

let request_content = SinglePaymentRequestContent {
amount: 1000,
currency: Currency::Millisats,
current_exchange_rate: None,
invoice: "invoice".to_string(),
auth_token: None,
expires_at: Timestamp::now_plus_seconds(300),
subscription_id: None,
description: Some("test".to_string()),
request_id: "test".to_string(),
};

let mut response = sender_sdk
.request_single_payment(receiver_key.public_key().0, vec![], request_content)
.await
.unwrap();

while let Some(resp) = response.next().await {

match resp {
Ok(resp) => {
log::info!("Sender: Received payment: {:?}", resp);
}
Err(e) => {
log::error!("Sender: Error receiving payment: {}", e);
}
}
}

Ok(())
}
5 changes: 1 addition & 4 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ pub async fn create_app_instance(

let app = PortalApp::new(
keypair.clone(),
vec![
"wss://relay.nostr.net".to_string(),
"wss://relay.damus.io".to_string(),
],
_relays,
Arc::new(LogRelayStatusChange),
)
.await?;
Expand Down
111 changes: 88 additions & 23 deletions src/router/actor.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
use core::f64;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

// use std::time::Duration inside async blocks
use nostr::{
event::{Event, EventBuilder, Kind},
filter::{Filter, MatchEventOptions},
message::{RelayMessage, SubscriptionId},
nips::nip44,
event::{Event, EventBuilder, Kind}, filter::{Filter, MatchEventOptions}, message::{RelayMessage, SubscriptionId}, nips::nip44, types::RelayUrl
};
use nostr_relay_pool::RelayPoolNotification;
use serde::{Serialize, de::DeserializeOwned};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;

use crate::{
protocol::{LocalKeypair, model::event_kinds::SUBKEY_PROOF},
protocol::{model::event_kinds::SUBKEY_PROOF, LocalKeypair},
router::{
CleartextEvent, Conversation, ConversationError, ConversationMessage, NotificationStream,
PortalId, Response, channel::Channel,
channel::{BroadcastResult, Channel}, CleartextEvent, Conversation, ConversationError, ConversationMessage, NotificationStream, PortalId, Response
},
};

Expand Down Expand Up @@ -857,12 +855,12 @@ impl MessageRouterActorState {
// check if Response has selected relays
if let Some(selected_relays) = selected_relays_optional {
for event in events_to_broadcast {
self.queue_event(channel, event, Some(selected_relays.clone()))
self.queue_event(channel, QueuedEvent::new_with_relays(event, selected_relays.clone()))
.await?;
}
} else {
for event in events_to_broadcast {
self.queue_event(channel, event, None).await?;
self.queue_event(channel, QueuedEvent::new(event)).await?;
}

// TODO: wait for confirmation from relays
Expand All @@ -879,25 +877,52 @@ impl MessageRouterActorState {
async fn queue_event<C: Channel>(
&self,
channel: &Arc<C>,
event: Event,
relays: Option<HashSet<String>>,
queue_event: QueuedEvent,
) -> Result<(), ConversationError>
where
C::Error: From<nostr::types::url::Error>,
{
if let Some(relays) = relays {
// if selected relays, broadcast to selected relays
channel
.broadcast_to(relays, event)
.await
.map_err(|e| ConversationError::Inner(Box::new(e)))?;
} else {
// if not selected relays, broadcast to all relays
channel
.broadcast(event)
.await
.map_err(|e| ConversationError::Inner(Box::new(e)))?;
let result = queue_event.broadcast(channel).await?;
if !result.failed().is_empty() {
// incrementally delay until it is confirmed
log::warn!("New event with id {} is queued, delaying until it is confirmed", queue_event.event.id);

let channel = channel.clone();
let event = queue_event.clone();

tokio::spawn(async move {
let mut attempt = 0.0;
let base_delay = 1.0; // seconds
let multiplier = f64::consts::E;
let mut remaining_relays = result;
while !remaining_relays.failed().is_empty() {
attempt += 1.0;

log::warn!("Attempt {} of event {} on relays {:?}", attempt, event.event.id, remaining_relays.failed());
let mut secs = base_delay * multiplier.powf(attempt);
if secs > 300.0 {
secs = 300.0;
}
tokio::time::sleep(std::time::Duration::from_secs_f64(secs)).await;
match event.broadcast_remaining(&channel, &remaining_relays).await {
Ok(result) => {
remaining_relays = result;
},
Err(_) => {
log::error!("Event {} failed to broadcast on relays {:?}", event.event.id, remaining_relays.failed());
},
}

if attempt > 25.0 {
log::error!("Event {} failed to broadcast after 25 attempts", event.event.id);
break;
}

}
});

}

Ok(())
}

Expand Down Expand Up @@ -1226,3 +1251,43 @@ impl std::fmt::Debug for ConversationBox {
f.debug_struct("Conversation").finish()
}
}


#[derive(Clone)]
pub struct QueuedEvent {
event: Event,
relays: Option<HashSet<String>>,
}

impl QueuedEvent {
pub fn new(event: Event ) -> Self {
Self { event, relays: None }
}
pub fn new_with_relays(event: Event, relays: HashSet<String>) -> Self {
Self { event, relays: Some(relays) }
}

pub async fn broadcast<C: Channel>(&self, channel: &Arc<C>)
-> Result<BroadcastResult, ConversationError>
where
C::Error: From<nostr::types::url::Error>,
{
let relays = self.relays.clone();

let result = if let Some(relays) = relays {
channel.broadcast_to(relays, &self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?
} else {
channel.broadcast(&self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?
};
Ok(result)
}

pub async fn broadcast_remaining<C: Channel>(&self, channel: &Arc<C>, relays: &BroadcastResult) -> Result<BroadcastResult, ConversationError>
where
C::Error: From<nostr::types::url::Error>,
{
let result = channel.broadcast_to(relays.failed().clone(), &self.event).await.map_err(|e| ConversationError::Inner(Box::new(e)))?;
Ok(result)
}

}
46 changes: 33 additions & 13 deletions src/router/channel.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

use nostr::{message::SubscriptionId, types::TryIntoUrl};
use nostr_relay_pool::{
RelayPool, RelayPoolNotification, SubscribeOptions,
Expand All @@ -9,7 +11,7 @@ use crate::router::ids::PortalId;
/// A trait for an abstract channel
///
/// This is modeled around Nostr relays, in which we can subscribe to events matching a filter.
pub trait Channel: Send + 'static {
pub trait Channel: Send + Sync + 'static {
type Error: std::error::Error + Send + Sync + 'static;

fn subscribe(
Expand Down Expand Up @@ -37,13 +39,13 @@ pub trait Channel: Send + 'static {

fn broadcast(
&self,
event: nostr::Event,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
event: &nostr::Event,
) -> impl std::future::Future<Output = Result<BroadcastResult, Self::Error>> + Send;
fn broadcast_to<I, U>(
&self,
urls: I,
event: nostr::Event,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send
event: &nostr::Event,
) -> impl std::future::Future<Output = Result<BroadcastResult, Self::Error>> + Send
where
<I as IntoIterator>::IntoIter: Send,
I: IntoIterator<Item = U> + Send,
Expand All @@ -57,6 +59,23 @@ pub trait Channel: Send + 'static {
fn shutdown(&self) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
}

/// A result of a Nostr broadcast
#[derive(Debug, Clone)]
pub struct BroadcastResult {
failed: HashSet<String>,
}

impl BroadcastResult {
pub fn new(failed: HashSet<String>) -> Self {
Self { failed }
}

/// Get the failed relays
pub fn failed(&self) -> &HashSet<String> {
&self.failed
}
}

impl Channel for RelayPool {
type Error = nostr_relay_pool::pool::Error;

Expand Down Expand Up @@ -113,19 +132,20 @@ impl Channel for RelayPool {
Ok(())
}

async fn broadcast(&self, event: nostr::Event) -> Result<(), Self::Error> {
self.send_event(&event).await?;
Ok(())
async fn broadcast(&self, event: &nostr::Event) -> Result<BroadcastResult, Self::Error> {
let output = self.send_event(event).await?;
return Ok(BroadcastResult::new(output.failed.keys().map(|url| url.to_string()).collect()));
}
async fn broadcast_to<I, U>(&self, urls: I, event: nostr::Event) -> Result<(), Self::Error>
async fn broadcast_to<I, U>(&self, urls: I, event: &nostr::Event) -> Result<BroadcastResult, Self::Error>
where
<I as IntoIterator>::IntoIter: Send,
I: IntoIterator<Item = U> + Send,
U: TryIntoUrl,
Self::Error: From<<U as TryIntoUrl>::Err>,
{
self.send_event_to(urls, &event).await?;
Ok(())
let output = self.send_event_to(urls, event).await?;

return Ok(BroadcastResult::new(output.failed.keys().map(|url| url.to_string()).collect()));
}

async fn receive(&self) -> Result<RelayPoolNotification, Self::Error> {
Expand Down Expand Up @@ -167,11 +187,11 @@ impl<C: Channel + Send + Sync> Channel for std::sync::Arc<C> {
<C as Channel>::unsubscribe(self, id).await
}

async fn broadcast(&self, event: nostr::Event) -> Result<(), Self::Error> {
async fn broadcast(&self, event: &nostr::Event) -> Result<BroadcastResult, Self::Error> {
<C as Channel>::broadcast(self, event).await
}

async fn broadcast_to<I, U>(&self, urls: I, event: nostr::Event) -> Result<(), Self::Error>
async fn broadcast_to<I, U>(&self, urls: I, event: &nostr::Event) -> Result<BroadcastResult, Self::Error>
where
<I as IntoIterator>::IntoIter: Send,
I: IntoIterator<Item = U> + Send,
Expand Down