diff --git a/iroh-relay/src/protos/common.rs b/iroh-relay/src/protos/common.rs index c5b6b157b4..49597420cf 100644 --- a/iroh-relay/src/protos/common.rs +++ b/iroh-relay/src/protos/common.rs @@ -53,6 +53,9 @@ pub enum FrameType { /// Payload is two big endian u32 durations in milliseconds: when to reconnect, /// and how long to try total. Restarting = 12, + /// Sent from server to client when the server closes the connection. + /// Contains the reason for closing. + Close = 13, } #[stack_error(derive, add_meta)] diff --git a/iroh-relay/src/protos/relay.rs b/iroh-relay/src/protos/relay.rs index ea50eee9eb..901cde30d8 100644 --- a/iroh-relay/src/protos/relay.rs +++ b/iroh-relay/src/protos/relay.rs @@ -93,6 +93,10 @@ pub enum RelayToClientMsg { /// until a problem exists. problem: String, }, + #[deprecated( + since = "0.97.0", + note = "Frame is no longer used but kept in place for wire backwards compatibility" + )] /// A one-way message from relay to client, advertising that the relay is restarting. Restarting { /// An advisory duration that the client should wait before attempting to reconnect. @@ -110,6 +114,41 @@ pub enum RelayToClientMsg { /// Reply to a [`ClientToRelayMsg::Ping`] from a client /// with the payload sent previously in the ping. Pong([u8; 8]), + /// Sent from the server before it closes the connection. + Close { + /// Contains the reason why the server chose to close the connection. + reason: CloseReason, + }, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +/// Reason why a relay server closes a connection to a client. +pub enum CloseReason { + /// The relay server is shutting down. + Shutdown, + /// Another endpoint with the same endpoint id connected to the relay server. + /// + /// When a new connection comes in from an endpoint id for which the server already has a connection, + /// the previous connection is terminated with this error. + SameEndpointIdConnected, +} + +impl CloseReason { + #[cfg(feature = "server")] + fn to_u8(&self) -> u8 { + match self { + CloseReason::Shutdown => 0, + CloseReason::SameEndpointIdConnected => 1, + } + } + + fn try_from_u8(value: u8) -> Option { + match value { + 0 => Some(CloseReason::Shutdown), + 1 => Some(CloseReason::SameEndpointIdConnected), + _ => None, + } + } } /// Messages that clients send to relays. @@ -258,7 +297,9 @@ impl RelayToClientMsg { Self::Ping { .. } => FrameType::Ping, Self::Pong { .. } => FrameType::Pong, Self::Health { .. } => FrameType::Health, + #[allow(deprecated, reason = "kept for wire backwards compatibility")] Self::Restarting { .. } => FrameType::Restarting, + Self::Close { .. } => FrameType::Close, } } @@ -293,6 +334,7 @@ impl RelayToClientMsg { Self::Health { problem } => { dst.put(problem.as_ref()); } + #[allow(deprecated, reason = "kept for wire backwards compatibility")] Self::Restarting { reconnect_in, try_for, @@ -300,6 +342,9 @@ impl RelayToClientMsg { dst.put_u32(reconnect_in.as_millis() as u32); dst.put_u32(try_for.as_millis() as u32); } + Self::Close { reason } => { + dst.put_u8(reason.to_u8()); + } } dst } @@ -314,10 +359,12 @@ impl RelayToClientMsg { Self::EndpointGone(_) => 32, Self::Ping(_) | Self::Pong(_) => 8, Self::Health { problem } => problem.len(), + #[allow(deprecated, reason = "kept for wire backwards compatibility")] Self::Restarting { .. } => { 4 // u32 + 4 // u32 } + Self::Close { .. } => 1, }; self.typ().encoded_len() + payload_len } @@ -383,11 +430,19 @@ impl RelayToClientMsg { ); let reconnect_in = Duration::from_millis(reconnect_in as u64); let try_for = Duration::from_millis(try_for as u64); + #[allow(deprecated, reason = "kept for wire backwards compatibility")] Self::Restarting { reconnect_in, try_for, } } + FrameType::Close => { + ensure!(content.len() == 1, Error::InvalidFrame); + let value = content.get_u8(); + let reason = + CloseReason::try_from_u8(value).ok_or_else(|| e!(Error::InvalidFrame))?; + Self::Close { reason } + } _ => { return Err(e!(Error::InvalidFrameType { frame_type })); } @@ -592,6 +647,7 @@ mod tests { 48 65 6c 6c 6f 20 57 6f 72 6c 64 21", ), ( + #[allow(deprecated)] RelayToClientMsg::Restarting { reconnect_in: Duration::from_millis(10), try_for: Duration::from_millis(20), @@ -725,6 +781,7 @@ mod proptests { }) .prop_map(|problem| RelayToClientMsg::Health { problem }); let restarting = (any::(), any::()).prop_map(|(reconnect_in, try_for)| { + #[allow(deprecated)] RelayToClientMsg::Restarting { reconnect_in: Duration::from_millis(reconnect_in.into()), try_for: Duration::from_millis(try_for.into()), diff --git a/iroh-relay/src/server/client.rs b/iroh-relay/src/server/client.rs index bf603ca382..b47c95d714 100644 --- a/iroh-relay/src/server/client.rs +++ b/iroh-relay/src/server/client.rs @@ -1,6 +1,10 @@ //! The server-side representation of an ongoing client relaying connection. -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + sync::{Arc, Mutex}, + time::Duration, +}; use iroh_base::EndpointId; use n0_error::{e, stack_error}; @@ -8,16 +12,19 @@ use n0_future::{SinkExt, StreamExt}; use rand::Rng; use time::{Date, OffsetDateTime}; use tokio::{ - sync::mpsc::{self, error::TrySendError}, + sync::{ + mpsc::{self, error::TrySendError}, + oneshot, + }, time::MissedTickBehavior, }; -use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; +use tokio_util::task::AbortOnDropHandle; use tracing::{Instrument, debug, trace, warn}; use crate::{ PingTracker, protos::{ - relay::{ClientToRelayMsg, Datagrams, PING_INTERVAL, RelayToClientMsg}, + relay::{ClientToRelayMsg, CloseReason, Datagrams, PING_INTERVAL, RelayToClientMsg}, streams::BytesStreamSink, }, server::{ @@ -62,7 +69,7 @@ pub struct Client { /// Connection identifier. connection_id: u64, /// Used to close the connection loop. - done: CancellationToken, + done_s: Mutex>>>, /// Actor handle. handle: AbortOnDropHandle<()>, /// Queue of packets intended for the client. @@ -91,10 +98,9 @@ impl Client { channel_capacity, } = config; - let done = CancellationToken::new(); let (send_queue_s, send_queue_r) = mpsc::channel(channel_capacity); - let (peer_gone_s, peer_gone_r) = mpsc::channel(channel_capacity); + let (done_s, done_r) = oneshot::channel(); let actor = Actor { stream, @@ -110,8 +116,7 @@ impl Client { }; // start io loop - let io_done = done.clone(); - let handle = tokio::task::spawn(actor.run(io_done).instrument(tracing::info_span!( + let handle = tokio::task::spawn(actor.run(done_r).instrument(tracing::info_span!( "client-connection-actor", remote_endpoint = %endpoint_id.fmt_short(), connection_id = connection_id @@ -121,7 +126,7 @@ impl Client { endpoint_id, connection_id, handle: AbortOnDropHandle::new(handle), - done, + done_s: Mutex::new(Some(done_s)), send_queue: send_queue_s, peer_gone: peer_gone_s, } @@ -134,8 +139,8 @@ impl Client { /// Shutdown the reader and writer loops and closes the connection. /// /// Any shutdown errors will be logged as warnings. - pub(super) async fn shutdown(self) { - self.start_shutdown(); + pub(super) async fn shutdown(self, reason: CloseReason) { + self.start_shutdown(Some(reason)); if let Err(e) = self.handle.await { warn!( remote_endpoint = %self.endpoint_id.fmt_short(), @@ -145,8 +150,10 @@ impl Client { } /// Starts the process of shutdown. - pub(super) fn start_shutdown(&self) { - self.done.cancel(); + pub(super) fn start_shutdown(&self, reason: Option) { + if let Some(sender) = self.done_s.lock().expect("poisoned").take() { + sender.send(reason).ok(); + } } pub(super) fn try_send_packet( @@ -205,7 +212,7 @@ pub enum RunError { source: ForwardPacketError, }, #[error("Flush")] - Flush {}, + CloseFlush {}, #[error(transparent)] HandleFrame { #[error(from)] @@ -217,10 +224,8 @@ pub enum RunError { PacketSend { source: WriteFrameError }, #[error("Server.endpoint_gone dropped")] EndpointGoneDrop {}, - #[error("EndpointGone write frame failed")] - EndpointGoneWriteFrame { source: WriteFrameError }, - #[error("Keep alive write frame failed")] - KeepAliveWriteFrame { source: WriteFrameError }, + #[error("Writing frame failed")] + WriteFrame { source: WriteFrameError }, #[error("Tick flush")] TickFlush {}, } @@ -268,7 +273,7 @@ impl Actor where S: BytesStreamSink, { - async fn run(mut self, done: CancellationToken) { + async fn run(mut self, done_r: oneshot::Receiver>) { // Note the accept and disconnects metrics must be in a pair. Technically the // connection is accepted long before this in the HTTP server, but it is clearer to // handle the metric here. @@ -276,7 +281,7 @@ where if self.client_counter.update(self.endpoint_id) { self.metrics.unique_client_keys.inc(); } - match self.run_inner(done).await { + match self.run_inner(done_r).await { Err(e) => { warn!("actor errored {e:#}, exiting"); } @@ -290,7 +295,10 @@ where self.metrics.disconnects.inc(); } - async fn run_inner(&mut self, done: CancellationToken) -> Result<(), RunError> { + async fn run_inner( + &mut self, + mut done_r: oneshot::Receiver>, + ) -> Result<(), RunError> { // Add some jitter to ping pong interactions, to avoid all pings being sent at the same time let next_interval = || { let random_secs = rand::rng().random_range(1..=5); @@ -306,10 +314,16 @@ where tokio::select! { biased; - _ = done.cancelled() => { - trace!("actor loop cancelled, exiting"); - // final flush - self.stream.flush().await.map_err(|_| e!(RunError::Flush))?; + reason = &mut done_r => { + trace!("actor loop cancelled, exiting (reason: {reason:?})"); + if let Ok(Some(reason)) = reason { + self.write_frame(RelayToClientMsg::Close { reason }).await + .map_err(|err| e!(RunError::WriteFrame, err))?; + } + self.stream + .flush() + .await + .map_err(|_| e!(RunError::CloseFlush))?; break; } maybe_frame = self.stream.next() => { @@ -332,7 +346,7 @@ where trace!("endpoint_id gone: {:?}", endpoint_id); self.write_frame(RelayToClientMsg::EndpointGone(endpoint_id)) .await - .map_err(|err| e!(RunError::EndpointGoneWriteFrame, err))?; + .map_err(|err| e!(RunError::WriteFrame, err))?; } _ = self.ping_tracker.timeout() => { trace!("pong timed out"); @@ -345,7 +359,7 @@ where let data = self.ping_tracker.new_ping(); self.write_frame(RelayToClientMsg::Ping(data)) .await - .map_err(|err| e!(RunError::KeepAliveWriteFrame, err))?; + .map_err(|err| e!(RunError::WriteFrame, err))?; } } @@ -538,6 +552,7 @@ mod tests { let (send_queue_s, send_queue_r) = mpsc::channel(10); let (peer_gone_s, peer_gone_r) = mpsc::channel(10); + let (done_s, done_r) = oneshot::channel(); let endpoint_id = SecretKey::generate(&mut rng).public(); let (io, io_rw) = tokio::io::duplex(1024); @@ -559,9 +574,7 @@ mod tests { metrics, }; - let done = CancellationToken::new(); - let io_done = done.clone(); - let handle = tokio::task::spawn(async move { actor.run(io_done).await }); + let handle = tokio::task::spawn(async move { actor.run(done_r).await }); // Write tests println!("-- write"); @@ -621,7 +634,7 @@ mod tests { .await .std_context("send")?; - done.cancel(); + done_s.send(None).ok(); handle.await.std_context("join")?; Ok(()) } diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index 47a53aa40f..327f039338 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -16,7 +16,10 @@ use tracing::{debug, trace}; use super::client::{Client, Config, ForwardPacketError}; use crate::{ - protos::{relay::Datagrams, streams::BytesStreamSink}, + protos::{ + relay::{CloseReason, Datagrams}, + streams::BytesStreamSink, + }, server::{client::SendError, metrics::Metrics}, }; @@ -49,8 +52,10 @@ impl Clients { trace!("shutting down {} clients", keys.len()); let clients = keys.into_iter().filter_map(|k| self.0.clients.remove(&k)); - n0_future::join_all(clients.map(|(_, client)| async move { client.shutdown().await })) - .await; + n0_future::join_all( + clients.map(|(_, client)| async move { client.shutdown(CloseReason::Shutdown).await }), + ) + .await; } /// Builds the client handler and starts the read & write loops for the connection. @@ -68,7 +73,9 @@ impl Clients { remote_endpoint = %endpoint_id.fmt_short(), "multiple connections found, pruning old connection", ); - old_client.shutdown().await; + old_client + .shutdown(CloseReason::SameEndpointIdConnected) + .await; } } @@ -144,7 +151,7 @@ impl Clients { dst = %dst.fmt_short(), "can no longer write to client, dropping message and pruning connection" ); - client.start_shutdown(); + client.start_shutdown(None); Err(ForwardPacketError::new(SendError::Closed)) } } @@ -158,6 +165,7 @@ mod tests { use iroh_base::SecretKey; use n0_error::{Result, StdResultExt}; use n0_future::{Stream, StreamExt}; + use n0_tracing_test::traced_test; use rand::SeedableRng; use super::*; @@ -206,6 +214,7 @@ mod tests { } #[tokio::test] + #[traced_test] async fn test_clients() -> Result { let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64); let a_key = SecretKey::generate(&mut rng).public(); @@ -232,7 +241,7 @@ mod tests { { let client = clients.0.clients.get(&a_key).unwrap(); // shutdown client a, this should trigger the removal from the clients list - client.start_shutdown(); + client.start_shutdown(None); } // need to wait a moment for the removal to be processed diff --git a/iroh-relay/src/server/http_server.rs b/iroh-relay/src/server/http_server.rs index be88e04b08..ed5a7a2185 100644 --- a/iroh-relay/src/server/http_server.rs +++ b/iroh-relay/src/server/http_server.rs @@ -1003,7 +1003,7 @@ mod tests { use crate::{ client::{Client, ClientBuilder, ConnectError, conn::Conn}, dns::DnsResolver, - protos::relay::{ClientToRelayMsg, Datagrams, RelayToClientMsg}, + protos::relay::{ClientToRelayMsg, CloseReason, Datagrams, RelayToClientMsg}, }; pub(crate) fn make_tls_config() -> TlsConfig { @@ -1323,6 +1323,12 @@ mod tests { }) .await; assert!(res.is_err()); + assert!(matches!( + client_b.next().await, + Some(Ok(RelayToClientMsg::Close { + reason: CloseReason::Shutdown + })) + )); assert!(client_b.next().await.is_none()); drop(client_a); @@ -1474,6 +1480,12 @@ mod tests { }) .await; assert!(res.is_err()); + assert!(matches!( + new_client_b.next().await, + Some(Ok(RelayToClientMsg::Close { + reason: CloseReason::Shutdown + })) + )); assert!(new_client_b.next().await.is_none()); Ok(()) } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 99af084aa4..c72314c3e5 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -1531,7 +1531,7 @@ mod tests { use n0_watcher::Watcher; use rand::SeedableRng; use tokio::sync::oneshot; - use tracing::{Instrument, debug_span, info, info_span, instrument}; + use tracing::{Instrument, debug_span, error_span, info, info_span, instrument}; use super::Endpoint; use crate::{ @@ -3058,4 +3058,54 @@ mod tests { Ok(()) } + + /// Tests that correct logs are emitted when connecting two endpoints with same secret keys to a relay. + #[tokio::test] + #[traced_test] + async fn same_endpoint_id_relay() -> Result { + let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?; + let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64); + let secret_key = SecretKey::generate(&mut rng); + + // bind ep1 and wait until connected to relay. + let ep1 = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone())) + .secret_key(secret_key.clone()) + .insecure_skip_relay_cert_verify(true) + .bind() + .instrument(error_span!("ep1")) + .await?; + ep1.online().await; + + // now start second endpoint with same secret key + let ep2 = Endpoint::empty_builder(RelayMode::Custom(relay_map)) + .secret_key(secret_key.clone()) + .insecure_skip_relay_cert_verify(true) + .bind() + .instrument(error_span!("ep2")) + .await?; + ep2.online().await; + tokio::time::sleep(Duration::from_secs(1)).await; + + // We assert that we get the error log once for endpoint 1, and not at all for endpoint 2. + logs_assert(|logs| { + let expected_line = |line: &str| { + line.contains("ERROR") && line.contains("Relay connection was closed because another endpoint with the same secret key connected to it") + }; + let count_line_ep1 = logs + .iter() + .filter(|line| line.contains(":ep1:") && expected_line(line)) + .count(); + let count_line_ep2 = logs + .iter() + .filter(|line| line.contains(":ep2:") && expected_line(line)) + .count(); + if count_line_ep1 == 1 && count_line_ep2 == 0 { + Ok(()) + } else { + Err("Logs don't match expectations".to_string()) + } + }); + tokio::join!(ep1.close(), ep2.close()); + Ok(()) + } } diff --git a/iroh/src/socket/transports/relay/actor.rs b/iroh/src/socket/transports/relay/actor.rs index f7b1d8caa3..74faaa2326 100644 --- a/iroh/src/socket/transports/relay/actor.rs +++ b/iroh/src/socket/transports/relay/actor.rs @@ -42,7 +42,7 @@ use iroh_base::{EndpointId, RelayUrl, SecretKey}; use iroh_relay::{ self as relay, PingTracker, client::{Client, ConnectError, RecvError, SendError}, - protos::relay::{ClientToRelayMsg, Datagrams, RelayToClientMsg}, + protos::relay::{ClientToRelayMsg, CloseReason, Datagrams, RelayToClientMsg}, }; use n0_error::{e, stack_error}; use n0_future::{ @@ -220,6 +220,16 @@ enum RelayConnectionError { Established { source: RunError }, } +impl RelayConnectionError { + fn as_run_error(&self) -> Option<&RunError> { + match self { + RelayConnectionError::Dial { .. } => None, + RelayConnectionError::Handshake { source, .. } => Some(source), + RelayConnectionError::Established { source, .. } => Some(source), + } + } +} + #[allow(missing_docs)] #[stack_error(derive, add_meta)] enum RunError { @@ -243,6 +253,8 @@ enum RunError { #[error(std_err)] source: SendError, }, + #[error("Relay server closed the connection: {reason:?}")] + ServerClosed { reason: CloseReason }, } #[allow(missing_docs)] @@ -317,8 +329,30 @@ impl ActiveRelayActor { let mut backoff = Self::build_backoff(); while let Err(err) = self.run_once().await { - warn!("{err}"); + warn!("{err:#}"); + match err { + _ if matches!( + err.as_run_error(), + Some(RunError::ServerClosed { + reason: CloseReason::SameEndpointIdConnected, + .. + }) + ) => + { + error!( + url=%self.url, + "Relay connection was closed because another endpoint with the same secret key connected to it. \ + Creating multiple endpoints with the same secret key is likely a bug in your usage of iroh. \ + This relay connection will remain pending infinitely." + ); + // If the relay disconnected us because another endpoint with the same secret key connected afterwards, + // we don't fight our successor and instead give up. + // If we would `break` here, the relay actor would very likely be restarted again right away. + // The home relay actor is restarted regularly if dropped in `RelayActor::reap_active_relays`. + // Other relay actors are restarted on-demand. + std::future::pending::<()>().await; + } RelayConnectionError::Dial { .. } | RelayConnectionError::Handshake { .. } => { // If dialing failed, or if the relay connection failed before we received a pong, // we wait an exponentially increasing time until we attempt to reconnect again. @@ -616,7 +650,7 @@ impl ActiveRelayActor { }; match msg { Ok(msg) => { - self.handle_relay_msg(msg, &mut state); + self.handle_relay_msg(msg, &mut state).map_err(|err| state.map_err(err))?; // reset the ping timer, we have just received a message ping_interval.reset(); }, @@ -639,7 +673,11 @@ impl ActiveRelayActor { res.map_err(|err| state.map_err(err)) } - fn handle_relay_msg(&mut self, msg: RelayToClientMsg, state: &mut ConnectedRelayState) { + fn handle_relay_msg( + &mut self, + msg: RelayToClientMsg, + state: &mut ConnectedRelayState, + ) -> Result<(), RunError> { match msg { RelayToClientMsg::Datagrams { remote_endpoint_id, @@ -687,10 +725,16 @@ impl ActiveRelayActor { RelayToClientMsg::Health { problem } => { warn!("Relay server reports problem: {problem}"); } + #[allow(deprecated)] RelayToClientMsg::Restarting { .. } => { trace!("Ignoring {msg:?}") } + RelayToClientMsg::Close { reason } => { + warn!("Relay server is closing connection: {reason:?}"); + return Err(e!(RunError::ServerClosed { reason })); + } } + Ok(()) } /// Run the actor main loop while sending to the relay server. @@ -751,7 +795,7 @@ impl ActiveRelayActor { break Err(e!(RunError::StreamClosedServer)); }; match msg { - Ok(msg) => self.handle_relay_msg(msg, state), + Ok(msg) => self.handle_relay_msg(msg, state).map_err(|err| state.map_err(err))?, Err(err) => break Err(e!(RunError::ClientStreamRead, err)), } }