Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions http-encoding/src/encode.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Stream encoders.

use futures_core::Stream;
use http::{header, Response, StatusCode, Version};
use http::{header, Response, StatusCode};

use super::{
coder::{Coder, FeaturedCode},
Expand Down Expand Up @@ -56,13 +56,13 @@ where
}

#[cfg(any(feature = "br", feature = "gz", feature = "de"))]
fn update_header(headers: &mut header::HeaderMap, value: &'static str, version: Version) {
fn update_header(headers: &mut header::HeaderMap, value: &'static str, version: http::Version) {
headers.insert(header::CONTENT_ENCODING, header::HeaderValue::from_static(value));
headers.remove(header::CONTENT_LENGTH);

// Connection specific headers are not allowed in HTTP/2 and later versions.
// see https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.2
if version < Version::HTTP_2 {
if version < http::Version::HTTP_2 {
headers.insert(header::TRANSFER_ENCODING, header::HeaderValue::from_static("chunked"));
}
}
6 changes: 3 additions & 3 deletions http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ io-uring = ["xitca-io/runtime-uring", "tokio-uring"]
router = ["xitca-router"]

[dependencies]
xitca-io = "0.4.0"
xitca-io = "0.4.2"
xitca-service = { version = "0.3.0", features = ["alloc"] }
xitca-unsafe-collection = { version = "0.2.0", features = ["bytes"] }

futures-core = "0.3.17"
http = "1"
httpdate = "1.0"
pin-project-lite = "0.2.10"
pin-project-lite = "0.2.16"
tracing = { version = "0.1.40", default-features = false }

# native tls support
Expand Down Expand Up @@ -80,7 +80,7 @@ tokio-uring = { version = "0.5.0", features = ["bytes"], optional = true }
socket2 = { version = "0.6.0", features = ["all"] }

[dev-dependencies]
criterion = "0.5"
criterion = "0.8.0"
xitca-server = "0.5"

[[bench]]
Expand Down
4 changes: 2 additions & 2 deletions http/benches/h1_decode.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::SystemTime;
use std::{hint::black_box, time::SystemTime};

use criterion::{Criterion, black_box, criterion_group, criterion_main};
use criterion::{Criterion, criterion_group, criterion_main};
use httpdate::HttpDate;
use tokio::time::Instant;
use xitca_http::{
Expand Down
238 changes: 18 additions & 220 deletions http/src/h1/body.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,38 @@
use core::{
cell::{RefCell, RefMut},
future::poll_fn,
ops::DerefMut,
fmt,
pin::Pin,
task::{Context, Poll, Waker},
task::{Context, Poll},
};

use std::{collections::VecDeque, io, rc::Rc};
use std::io;

use futures_core::stream::Stream;

use crate::bytes::Bytes;

/// max buffer size 32k
pub(crate) const MAX_BUFFER_SIZE: usize = 32_768;

#[derive(Clone, Debug)]
enum RequestBodyInner {
Some(Rc<RefCell<Inner>>),
#[cfg(feature = "io-uring")]
Completion(super::dispatcher_uring::Body),
None,
}

impl RequestBodyInner {
fn new(eof: bool) -> Self {
match eof {
true => Self::None,
false => Self::Some(Default::default()),
}
}
}

/// Buffered stream of request body chunk.
///
/// impl [Stream] trait to produce chunk as [Bytes] type in async manner.
#[derive(Debug)]
pub struct RequestBody(RequestBodyInner);
pub struct RequestBody(Option<Pin<Box<dyn Stream<Item = io::Result<Bytes>>>>>);

impl Default for RequestBody {
fn default() -> Self {
Self(RequestBodyInner::new(true))
impl fmt::Debug for RequestBody {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("RequestBody")
}
}

impl RequestBody {
// an async spsc channel where RequestBodySender used to push data and popped from RequestBody.
pub(super) fn channel(eof: bool) -> (RequestBodySender, Self) {
let inner = RequestBodyInner::new(eof);
(RequestBodySender(inner.clone()), RequestBody(inner))
pub(super) fn new<S>(body: S) -> Self
where
S: Stream<Item = io::Result<Bytes>> + 'static,
{
Self(Some(Box::pin(body)))
}
}

#[cfg(feature = "io-uring")]
pub(super) fn io_uring(body: super::dispatcher_uring::Body) -> Self {
RequestBody(RequestBodyInner::Completion(body))
impl Default for RequestBody {
fn default() -> Self {
Self(None)
}
}

Expand All @@ -62,10 +41,8 @@ impl Stream for RequestBody {

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<io::Result<Bytes>>> {
match self.get_mut().0 {
RequestBodyInner::Some(ref mut inner) => inner.borrow_mut().poll_next_unpin(cx),
RequestBodyInner::None => Poll::Ready(None),
#[cfg(feature = "io-uring")]
RequestBodyInner::Completion(ref mut body) => Pin::new(body).poll_next(cx),
None => Poll::Ready(None),
Some(ref mut body) => Pin::new(body).poll_next(cx),
}
}
}
Expand All @@ -75,182 +52,3 @@ impl From<RequestBody> for crate::body::RequestBody {
Self::H1(body)
}
}

/// Sender part of the payload stream
pub struct RequestBodySender(RequestBodyInner);

// TODO: rework early eof error handling.
impl Drop for RequestBodySender {
fn drop(&mut self) {
if let Some(mut inner) = self.try_inner() {
if !inner.eof {
inner.feed_error(io::ErrorKind::UnexpectedEof.into());
}
}
}
}

impl RequestBodySender {
// try to get a mutable reference of inner and ignore RequestBody::None variant.
fn try_inner(&mut self) -> Option<RefMut<'_, Inner>> {
self.try_inner_on_none_with(|| {})
}

// try to get a mutable reference of inner and panic on RequestBody::None variant.
// this is a runtime check for internal optimization to avoid unnecessary operations.
// public api must not be able to trigger this panic.
fn try_inner_infallible(&mut self) -> Option<RefMut<'_, Inner>> {
self.try_inner_on_none_with(|| panic!("No Request Body found. Do not waste operation on Sender."))
}

fn try_inner_on_none_with<F>(&mut self, func: F) -> Option<RefMut<'_, Inner>>
where
F: FnOnce(),
{
match self.0 {
RequestBodyInner::Some(ref inner) => {
// request body is a shared pointer between only two owners and no weak reference.
debug_assert!(Rc::strong_count(inner) <= 2);
debug_assert_eq!(Rc::weak_count(inner), 0);
(Rc::strong_count(inner) != 1).then_some(inner.borrow_mut())
}
_ => {
func();
None
}
}
}

pub(super) fn feed_error(&mut self, e: io::Error) {
if let Some(mut inner) = self.try_inner_infallible() {
inner.feed_error(e);
}
}

pub(super) fn feed_eof(&mut self) {
if let Some(mut inner) = self.try_inner_infallible() {
inner.feed_eof();
}
}

pub(super) fn feed_data(&mut self, data: Bytes) {
if let Some(mut inner) = self.try_inner_infallible() {
inner.feed_data(data);
}
}

pub(super) fn ready(&mut self) -> impl Future<Output = io::Result<()>> + '_ {
self.ready_with(|inner| !inner.backpressure())
}

// Lazily wait until RequestBody is already polled.
// For specific use case body must not be eagerly polled.
// For example: Request with Expect: Continue header.
pub(super) fn wait_for_poll(&mut self) -> impl Future<Output = io::Result<()>> + '_ {
self.ready_with(|inner| inner.waiting())
}

async fn ready_with<F>(&mut self, func: F) -> io::Result<()>
where
F: Fn(&mut Inner) -> bool,
{
poll_fn(|cx| {
// Check only if Payload (other side) is alive, Otherwise always return io error.
match self.try_inner_infallible() {
Some(mut inner) => {
if func(inner.deref_mut()) {
Poll::Ready(Ok(()))
} else {
// when payload is not ready register current task waker and wait.
inner.register_io(cx);
Poll::Pending
}
}
None => Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())),
}
})
.await
}
}

#[derive(Debug, Default)]
struct Inner {
eof: bool,
len: usize,
err: Option<io::Error>,
items: VecDeque<Bytes>,
task: Option<Waker>,
io_task: Option<Waker>,
}

impl Inner {
/// Wake up future waiting for payload data to be available.
fn wake(&mut self) {
if let Some(waker) = self.task.take() {
waker.wake();
}
}

/// Wake up future feeding data to Payload.
fn wake_io(&mut self) {
if let Some(waker) = self.io_task.take() {
waker.wake();
}
}

/// true when a future is waiting for payload data.
fn waiting(&self) -> bool {
self.task.is_some()
}

/// Register future waiting data from payload.
/// Waker would be used in `Inner::wake`
fn register(&mut self, cx: &Context<'_>) {
if self.task.as_ref().map(|w| !cx.waker().will_wake(w)).unwrap_or(true) {
self.task = Some(cx.waker().clone());
}
}

// Register future feeding data to payload.
/// Waker would be used in `Inner::wake_io`
fn register_io(&mut self, cx: &Context<'_>) {
if self.io_task.as_ref().map(|w| !cx.waker().will_wake(w)).unwrap_or(true) {
self.io_task = Some(cx.waker().clone());
}
}

fn feed_error(&mut self, err: io::Error) {
self.err = Some(err);
self.wake();
}

fn feed_eof(&mut self) {
self.eof = true;
self.wake();
}

fn feed_data(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_back(data);
self.wake();
}

fn backpressure(&self) -> bool {
self.len >= MAX_BUFFER_SIZE
}

fn poll_next_unpin(&mut self, cx: &Context<'_>) -> Poll<Option<io::Result<Bytes>>> {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
Poll::Ready(Some(Ok(data)))
} else if let Some(err) = self.err.take() {
Poll::Ready(Some(Err(err)))
} else if self.eof {
Poll::Ready(None)
} else {
self.register(cx);
self.wake_io();
Poll::Pending
}
}
}
Loading