diff --git a/src/lib.rs b/src/lib.rs index 69fe378..bc1e9d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1099,8 +1099,39 @@ pub mod channel { use crate::sealed::Sealed; /// A sender that does nothing. This is used when no communication is needed. - #[derive(Debug)] - pub struct NoSender; + /// + /// When created from a remote [`quinn::SendStream`], finishing the + /// stream is deferred until this value is dropped, signaling to the + /// remote peer that the request has been received. + pub struct NoSender { + #[cfg(feature = "rpc")] + pub(crate) stream: Option, + } + + impl NoSender { + pub(crate) fn new() -> Self { + Self { + #[cfg(feature = "rpc")] + stream: None, + } + } + } + + impl std::fmt::Debug for NoSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NoSender").finish() + } + } + + #[cfg(feature = "rpc")] + impl Drop for NoSender { + fn drop(&mut self) { + if let Some(ref mut stream) = self.stream { + stream.finish().ok(); + } + } + } + impl Sealed for NoSender {} impl crate::Sender for NoSender {} @@ -1244,7 +1275,7 @@ where let (inner,) = inner; Self { inner, - tx: NoSender, + tx: NoSender::new(), rx: NoReceiver, #[cfg(feature = "spans")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "spans")))] @@ -1527,7 +1558,10 @@ impl Client { Request::Remote(_request) => unreachable!(), #[cfg(feature = "rpc")] Request::Remote(request) => { - let (_tx, _rx) = request.write(msg).await?; + let (mut tx, mut rx) = request.write(msg).await?; + tx.finish() + .map_err(|e| rpc::WriteError::from(quinn::WriteError::from(e)))?; + rx.received_reset().await?; } }; Ok(()) @@ -1559,14 +1593,19 @@ impl Client { Request::Remote(request) => { // see https://www.iroh.computer/blog/0rtt-api#connect-side let buf = rpc::prepare_write::(msg)?; - let (_tx, _rx) = request.write_raw(&buf).await?; + let (mut tx, mut rx) = request.write_raw(&buf).await?; if !this.0.zero_rtt_accepted().await { // 0rtt was not accepted, the data is lost, send it again! let Request::Remote(request) = this.request().await? else { unreachable!() }; - let (_tx, _rx) = request.write_raw(&buf).await?; + let (tx2, rx2) = request.write_raw(&buf).await?; + tx = tx2; + rx = rx2; } + tx.finish() + .map_err(|e| rpc::WriteError::from(quinn::WriteError::from(e)))?; + rx.received_reset().await?; } }; Ok(()) @@ -1749,6 +1788,12 @@ pub enum Error { #[cfg(feature = "rpc")] #[error("Recv error")] Write { source: rpc::WriteError }, + #[cfg(feature = "rpc")] + #[error("Reset error")] + Reset { + #[error(std_err)] + source: quinn::ResetError, + }, } /// Type alias for a result with an irpc error type. @@ -1763,6 +1808,8 @@ impl From for io::Error { Error::OneshotRecv { source, .. } => source.into(), #[cfg(feature = "rpc")] Error::Write { source, .. } => source.into(), + #[cfg(feature = "rpc")] + Error::Reset { source, .. } => source.into(), } } } @@ -2112,8 +2159,9 @@ pub mod rpc { impl From for NoSender { fn from(write: quinn::SendStream) -> Self { - let _ = write; - NoSender + NoSender { + stream: Some(write), + } } }