Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions client/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,34 @@ where
self.body.size_hint()
}
}

impl Default for RequestBody {
fn default() -> Self {
Self::None
}
}

pub enum RequestBody {
Reusable(Bytes),
Stream(Option<BoxBody>),
None,
}

impl RequestBody {
pub fn as_stream(&mut self) -> BoxBody {
match self {
Self::Reusable(bytes) => BoxBody::new(Once::new(Bytes::clone(bytes))),
Self::None => BoxBody::new(NoneBody::default()),
Self::Stream(stream) => {
let stream = stream.take();

stream.unwrap_or_else(|| BoxBody::new(NoneBody::default()))
}
}
}

pub fn into_reusable(self) -> Self {
// @TODO ?
unimplemented!()
}
}
4 changes: 2 additions & 2 deletions client/src/middleware/redirect.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
body::BoxBody,
body::RequestBody,
error::{Error, InvalidUri},
http::{
header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE, LOCATION, TRANSFER_ENCODING},
Expand Down Expand Up @@ -41,7 +41,7 @@ where
method = Method::GET;
}

*req.body_mut() = BoxBody::default();
*req.body_mut() = RequestBody::default();

for header in &[TRANSFER_ENCODING, CONTENT_ENCODING, CONTENT_TYPE, CONTENT_LENGTH] {
headers.remove(header);
Expand Down
18 changes: 7 additions & 11 deletions client/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use core::{marker::PhantomData, time::Duration};
use futures_core::Stream;

use crate::{
body::{BodyError, BoxBody, Once},
body::{BodyError, BoxBody, RequestBody},
bytes::Bytes,
client::Client,
error::Error,
Expand All @@ -18,7 +18,7 @@ use crate::{

/// builder type for [http::Request] with extended functionalities.
pub struct RequestBuilder<'a, M = marker::Http> {
pub(crate) req: http::Request<BoxBody>,
pub(crate) req: http::Request<RequestBody>,
err: Vec<Error>,
client: &'a Client,
timeout: Duration,
Expand Down Expand Up @@ -74,7 +74,7 @@ impl RequestBuilder<'_, marker::Http> {
let bytes = Bytes::from(body);
let val = HeaderValue::from(bytes.len());
self.headers_mut().insert(CONTENT_LENGTH, val);
self.map_body(Once::new(bytes))
self.map_body(RequestBody::Reusable(bytes))
}

/// Use streaming type as request body.
Expand All @@ -84,7 +84,7 @@ impl RequestBuilder<'_, marker::Http> {
B: Stream<Item = Result<Bytes, E>> + Send + 'static,
E: Into<BodyError>,
{
self.map_body(body)
self.map_body(RequestBody::Stream(Some(BoxBody::new(body))))
}

/// Finish request builder and send it to server.
Expand All @@ -100,7 +100,7 @@ impl<'a, M> RequestBuilder<'a, M> {
E: Into<BodyError>,
{
Self {
req: req.map(BoxBody::new),
req: req.map(|_| RequestBody::default()),
err: Vec::new(),
client,
timeout: client.timeout_config.request_timeout,
Expand Down Expand Up @@ -210,12 +210,8 @@ impl<'a, M> RequestBuilder<'a, M> {
self
}

fn map_body<B, E>(mut self, b: B) -> RequestBuilder<'a, M>
where
B: Stream<Item = Result<Bytes, E>> + Send + 'static,
E: Into<BodyError>,
{
self.req = self.req.map(|_| BoxBody::new(b));
fn map_body(mut self, b: RequestBody) -> RequestBuilder<'a, M> {
self.req = self.req.map(|_| b);
self
}
}
33 changes: 20 additions & 13 deletions client/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::{future::Future, pin::Pin, time::Duration};

use crate::{
body::BoxBody,
body::RequestBody,
client::Client,
connect::Connect,
error::Error,
Expand Down Expand Up @@ -65,7 +65,7 @@ where
///
/// [RequestBuilder]: crate::request::RequestBuilder
pub struct ServiceRequest<'r, 'c> {
pub req: &'r mut Request<BoxBody>,
pub req: &'r mut Request<RequestBody>,
pub client: &'c Client,
pub timeout: Duration,
}
Expand All @@ -86,13 +86,20 @@ pub(crate) fn base_service() -> HttpService {
use crate::{error::TimeoutError, timeout::Timeout};

let ServiceRequest { req, client, timeout } = req;
let cloned_body = match req.body() {
RequestBody::Reusable(body) => RequestBody::Reusable(body.clone()),
_ => RequestBody::default(),
};

let uri = Uri::try_parse(req.uri())?;
let mut send_req = core::mem::take(req).map(|mut b| b.as_stream());
*req.body_mut() = cloned_body;

let uri = Uri::try_parse(send_req.uri())?;

// temporary version to record possible version downgrade/upgrade happens when making connections.
// alpn protocol and alt-svc header are possible source of version change.
#[allow(unused_mut)]
let mut version = req.version();
let mut version = send_req.version();

let mut connect = Connect::new(uri);

Expand All @@ -103,12 +110,12 @@ pub(crate) fn base_service() -> HttpService {
Version::HTTP_2 | Version::HTTP_3 => match client.shared_pool.acquire(&connect.uri).await {
shared::AcquireOutput::Conn(mut _conn) => {
let mut _timer = Box::pin(tokio::time::sleep(timeout));
*req.version_mut() = version;
*send_req.version_mut() = version;
#[allow(unreachable_code)]
return match _conn.conn {
#[cfg(feature = "http2")]
crate::connection::ConnectionShared::H2(ref mut conn) => {
match crate::h2::proto::send(conn, _date, core::mem::take(req))
match crate::h2::proto::send(conn, _date, send_req)
.timeout(_timer.as_mut())
.await
{
Expand All @@ -128,7 +135,7 @@ pub(crate) fn base_service() -> HttpService {
}
#[cfg(feature = "http3")]
crate::connection::ConnectionShared::H3(ref mut conn) => {
let res = crate::h3::proto::send(conn, _date, core::mem::take(req))
let res = crate::h3::proto::send(conn, _date, send_req)
.timeout(_timer.as_mut())
.await
.map_err(|_| TimeoutError::Request)??;
Expand Down Expand Up @@ -214,12 +221,12 @@ pub(crate) fn base_service() -> HttpService {
},
version => match client.exclusive_pool.acquire(&connect.uri).await {
exclusive::AcquireOutput::Conn(mut _conn) => {
*req.version_mut() = version;
*send_req.version_mut() = version;

#[cfg(feature = "http1")]
{
let mut timer = Box::pin(tokio::time::sleep(timeout));
let res = crate::h1::proto::send(&mut *_conn, _date, req)
let res = crate::h1::proto::send(&mut *_conn, _date, &mut send_req)
.timeout(timer.as_mut())
.await;

Expand Down Expand Up @@ -273,7 +280,7 @@ mod test {
use std::sync::Arc;

use crate::{
body::{BoxBody, ResponseBody},
body::{RequestBody, ResponseBody},
client::Client,
error::Error,
http::{self, Request},
Expand All @@ -293,14 +300,14 @@ mod test {

pub(crate) struct HttpServiceMockHandle(Client);

type HandlerFn = Arc<dyn Fn(Request<BoxBody>) -> Result<http::Response<ResponseBody>, Error> + Send + Sync>;
type HandlerFn = Arc<dyn Fn(Request<RequestBody>) -> Result<http::Response<ResponseBody>, Error> + Send + Sync>;

impl HttpServiceMockHandle {
/// compose a service request with given http request and it's mocked server side handler function
pub(crate) fn mock<'r, 'c>(
&'c self,
req: &'r mut Request<BoxBody>,
handler: impl Fn(Request<BoxBody>) -> Result<http::Response<ResponseBody>, Error> + Send + Sync + 'static,
req: &'r mut Request<RequestBody>,
handler: impl Fn(Request<RequestBody>) -> Result<http::Response<ResponseBody>, Error> + Send + Sync + 'static,
) -> ServiceRequest<'r, 'c> {
req.extensions_mut().insert(Arc::new(handler) as HandlerFn);
ServiceRequest {
Expand Down
Loading