From a2ee77e946693c8393144b913f485d0190dae033 Mon Sep 17 00:00:00 2001 From: Joel Wurtz Date: Wed, 12 Feb 2025 19:52:38 +0100 Subject: [PATCH 1/3] feat(client): handle max requests for exclusive pool --- client/src/builder.rs | 20 +++++++++++++++++++- client/src/pool/exclusive.rs | 30 ++++++++++++++++++++++-------- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/client/src/builder.rs b/client/src/builder.rs index f709077dc..8255b45e8 100644 --- a/client/src/builder.rs +++ b/client/src/builder.rs @@ -25,6 +25,7 @@ pub struct ClientBuilder { pool_capacity: usize, keep_alive_idle: Duration, keep_alive_born: Duration, + keep_alive_max_requests: usize, timeout_config: TimeoutConfig, local_addr: Option, max_http_version: Version, @@ -45,6 +46,7 @@ impl ClientBuilder { pool_capacity: 2, keep_alive_idle: Duration::from_secs(60), keep_alive_born: Duration::from_secs(3600), + keep_alive_max_requests: 10_000, timeout_config: TimeoutConfig::new(), local_addr: None, max_http_version: max_http_version(), @@ -353,6 +355,17 @@ impl ClientBuilder { self } + /// Set max requests to handle for a keep alive connection. + /// + /// This settings will force the connection to be dropped after this many requests. + /// + /// Default to 10 000. + /// + pub fn set_keep_alive_max_requests(mut self, max_requests: usize) -> Self { + self.keep_alive_max_requests = max_requests; + self + } + /// Set max http version client would be used. /// /// Default to the max version of http feature enabled within Cargo.toml @@ -481,7 +494,12 @@ impl ClientBuilder { }; Client { - exclusive_pool: pool::exclusive::Pool::new(self.pool_capacity, self.keep_alive_idle, self.keep_alive_born), + exclusive_pool: pool::exclusive::Pool::new( + self.pool_capacity, + self.keep_alive_idle, + self.keep_alive_born, + self.keep_alive_max_requests, + ), shared_pool: pool::shared::Pool::with_capacity(self.pool_capacity), connector: self.connector, resolver: self.resolver, diff --git a/client/src/pool/exclusive.rs b/client/src/pool/exclusive.rs index 0391d295b..812151bb5 100644 --- a/client/src/pool/exclusive.rs +++ b/client/src/pool/exclusive.rs @@ -23,6 +23,7 @@ pub struct Pool { cap: usize, keep_alive_idle: Duration, keep_alive_born: Duration, + max_requests: usize, } impl Clone for Pool { @@ -32,6 +33,7 @@ impl Clone for Pool { cap: self.cap, keep_alive_idle: self.keep_alive_idle, keep_alive_born: self.keep_alive_born, + max_requests: self.max_requests, } } } @@ -40,12 +42,13 @@ impl Pool where K: Eq + Hash + Clone, { - pub(crate) fn new(cap: usize, keep_alive_idle: Duration, keep_alive_born: Duration) -> Self { + pub(crate) fn new(cap: usize, keep_alive_idle: Duration, keep_alive_born: Duration, max_requests: usize) -> Self { Self { conns: Arc::new(Mutex::new(HashMap::new())), cap, keep_alive_idle, keep_alive_born, + max_requests, } } @@ -120,7 +123,7 @@ where if res.is_ok() { queue.push_back(PooledConn { conn, - state: ConnState::new(self.keep_alive_idle, self.keep_alive_born), + state: ConnState::new(self.keep_alive_idle, self.keep_alive_born, self.max_requests), }); } } @@ -129,7 +132,7 @@ where let mut queue = VecDeque::with_capacity(self.cap); queue.push_back(PooledConn { conn, - state: ConnState::new(self.keep_alive_idle, self.keep_alive_born), + state: ConnState::new(self.keep_alive_idle, self.keep_alive_born, self.max_requests), }); conns.insert(key, (permits, queue)); } @@ -222,7 +225,7 @@ where let mut conns = self.pool.conns.lock().unwrap(); if let Some((_, queue)) = conns.get_mut(&self.key) { - conn.state.update_idle(); + conn.state.update_for_reentry(); queue.push_back(conn); } @@ -263,28 +266,35 @@ impl DerefMut for PooledConn { struct ConnState { born: Instant, idle_since: Instant, + requests: usize, keep_alive_idle: Duration, keep_alive_born: Duration, + max_requests: usize, } impl ConnState { - fn new(keep_alive_idle: Duration, keep_alive_born: Duration) -> Self { + fn new(keep_alive_idle: Duration, keep_alive_born: Duration, max_requests: usize) -> Self { let now = Instant::now(); Self { born: now, idle_since: now, + requests: 0, keep_alive_idle, keep_alive_born, + max_requests, } } - fn update_idle(&mut self) { + fn update_for_reentry(&mut self) { self.idle_since = Instant::now(); + self.requests += 1; } fn is_expired(&self) -> bool { - self.born.elapsed() > self.keep_alive_born || self.idle_since.elapsed() > self.keep_alive_idle + self.born.elapsed() > self.keep_alive_born + || self.idle_since.elapsed() > self.keep_alive_idle + || self.requests >= self.max_requests } } @@ -309,7 +319,11 @@ where if let Some((_, queue)) = self.pool.conns.lock().unwrap().get_mut(&self.key) { queue.push_back(PooledConn { conn, - state: ConnState::new(self.pool.keep_alive_idle, self.pool.keep_alive_born), + state: ConnState::new( + self.pool.keep_alive_idle, + self.pool.keep_alive_born, + self.pool.max_requests, + ), }); } } From 7335a665b66e87a859a42101be559c77aa671244 Mon Sep 17 00:00:00 2001 From: Joel Wurtz Date: Wed, 12 Feb 2025 20:57:20 +0100 Subject: [PATCH 2/3] feat(h1): allow to set keep alive configuration from response header --- client/src/pool/exclusive.rs | 13 ++++++++++++ client/src/service/http.rs | 40 +++++++++++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/client/src/pool/exclusive.rs b/client/src/pool/exclusive.rs index 812151bb5..d9f466bae 100644 --- a/client/src/pool/exclusive.rs +++ b/client/src/pool/exclusive.rs @@ -206,6 +206,19 @@ where self.destroy_on_drop = true; } + #[cfg(feature = "http1")] + pub(crate) fn keep_alive_hint(&mut self, timeout: Option, max_requests: Option) { + if let Some(conn) = self.conn.as_mut() { + if let Some(timeout) = timeout { + conn.state.keep_alive_idle = timeout; + } + + if let Some(max_requests) = max_requests { + conn.state.max_requests = max_requests; + } + } + } + #[cfg(feature = "http1")] pub(crate) fn is_destroy_on_drop(&self) -> bool { self.destroy_on_drop diff --git a/client/src/service/http.rs b/client/src/service/http.rs index 725cbd28b..150c53205 100644 --- a/client/src/service/http.rs +++ b/client/src/service/http.rs @@ -1,8 +1,10 @@ +use std::time::Duration; + use crate::{ Service, ServiceRequest, connect::Connect, error::Error, - http::Version, + http::{HeaderName, Version}, pool::{exclusive, shared}, response::Response, service::ServiceDyn, @@ -166,6 +168,9 @@ pub(crate) fn base_service() -> HttpService { Ok(Ok((res, buf, decoder, is_close))) => { if is_close { _conn.destroy_on_drop(); + } else { + let (timeout, max) = parse_keep_alive(&res); + _conn.keep_alive_hint(timeout, max); } let body = crate::h1::body::ResponseBody::new(_conn, buf, decoder); let res = res.map(|_| crate::body::ResponseBody::H1(body)); @@ -201,3 +206,36 @@ pub(crate) fn base_service() -> HttpService { Box::new(HttpService) } + +const KEEP_ALIVE: HeaderName = HeaderName::from_static("keep-alive"); + +fn parse_keep_alive(res: &crate::http::Response) -> (Option, Option) { + let header = match res.headers().get(KEEP_ALIVE).map(|h| h.to_str()) { + Some(Ok(header)) => header, + _ => return (None, None), + }; + + let mut timeout = None; + let mut max = None; + + for (key, value) in header.split(',').map(|item| { + let mut kv = item.splitn(2, '='); + + ( + kv.next().map(|s| s.trim()).unwrap_or_default(), + kv.next().map(|s| s.trim()).unwrap_or_default(), + ) + }) { + match key.to_lowercase().as_str() { + "timeout" => { + timeout = value.parse::().ok().map(Duration::from_secs); + } + "max" => { + max = value.parse().ok(); + } + _ => {} + } + } + + (timeout, max) +} From 7751f0e27ddea86f17eed736d729803e5bdd00e6 Mon Sep 17 00:00:00 2001 From: Joel Wurtz Date: Tue, 25 Feb 2025 12:24:30 +0100 Subject: [PATCH 3/3] feat(client): add test for keep alive parsing --- client/src/service/http.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/client/src/service/http.rs b/client/src/service/http.rs index 150c53205..a33198f19 100644 --- a/client/src/service/http.rs +++ b/client/src/service/http.rs @@ -239,3 +239,23 @@ fn parse_keep_alive(res: &crate::http::Response) -> (Option, Opt (timeout, max) } + +#[cfg(test)] +mod test { + use crate::{body::ResponseBody, http}; + + use super::*; + + #[test] + fn test_parse_timeout_and_max() { + let res = http::Response::builder() + .header("keep-alive", "timeout=100, max=10") + .body(ResponseBody::Eof) + .unwrap(); + + let (timeout, max) = parse_keep_alive(&res); + + assert_eq!(timeout, Some(Duration::from_secs(100))); + assert_eq!(max, Some(10)); + } +}