From 212c2b9d6dab940eebcac63bfbeb8537e5a175ef Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 20 May 2025 10:03:37 +0530 Subject: [PATCH 1/2] Use interior mutability in backoff --- grpc/src/client/name_resolution/backoff.rs | 38 ++++++++++++---------- grpc/src/client/name_resolution/dns/mod.rs | 2 +- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/grpc/src/client/name_resolution/backoff.rs b/grpc/src/client/name_resolution/backoff.rs index 35b560eef..6b9adf7bd 100644 --- a/grpc/src/client/name_resolution/backoff.rs +++ b/grpc/src/client/name_resolution/backoff.rs @@ -1,5 +1,5 @@ use rand::Rng; -use std::time::Duration; +use std::{sync::Mutex, time::Duration}; /// TODO(arjan-bal): Move this #[derive(Clone)] @@ -23,7 +23,7 @@ pub struct ExponentialBackoff { /// The delay for the next retry, without the random jitter. Store as f64 /// to avoid rounding errors. - next_delay_secs: f64, + next_delay_secs: Mutex, } /// This is a backoff configuration with the default values specified @@ -51,23 +51,25 @@ impl ExponentialBackoff { let next_delay_secs = config.base_delay.as_secs_f64(); ExponentialBackoff { config, - next_delay_secs, + next_delay_secs: Mutex::new(next_delay_secs), } } - pub fn reset(&mut self) { - self.next_delay_secs = self.config.base_delay.as_secs_f64(); + pub fn reset(&self) { + let mut next_delay = self.next_delay_secs.lock().unwrap(); + *next_delay = self.config.base_delay.as_secs_f64(); } - pub fn backoff_duration(&mut self) -> Duration { - let ret = self.next_delay_secs - * (1.0 + self.config.jitter * rand::thread_rng().gen_range(-1.0..1.0)); - self.next_delay_secs = self + pub fn backoff_duration(&self) -> Duration { + let mut next_delay = self.next_delay_secs.lock().unwrap(); + let cur_delay = + *next_delay * (1.0 + self.config.jitter * rand::thread_rng().gen_range(-1.0..1.0)); + *next_delay = self .config .max_delay .as_secs_f64() - .min(self.next_delay_secs * self.config.multiplier); - Duration::from_secs_f64(ret) + .min(*next_delay * self.config.multiplier); + Duration::from_secs_f64(cur_delay) } } @@ -89,7 +91,7 @@ mod tests { jitter: 0.0, max_delay: Duration::from_secs(100), }; - let mut backoff = ExponentialBackoff::new(config.clone()); + let backoff = ExponentialBackoff::new(config.clone()); assert_eq!(backoff.backoff_duration(), Duration::from_secs(10)); } @@ -101,7 +103,7 @@ mod tests { base_delay: Duration::from_secs(100), max_delay: Duration::from_secs(10), }; - let mut backoff = ExponentialBackoff::new(config.clone()); + let backoff = ExponentialBackoff::new(config.clone()); assert_eq!(backoff.backoff_duration(), Duration::from_secs(10)); } @@ -113,7 +115,7 @@ mod tests { base_delay: Duration::from_secs(10), max_delay: Duration::from_secs(100), }; - let mut backoff = ExponentialBackoff::new(config.clone()); + let backoff = ExponentialBackoff::new(config.clone()); // multiplier gets clipped to 1. assert_eq!(backoff.backoff_duration(), Duration::from_secs(10)); assert_eq!(backoff.backoff_duration(), Duration::from_secs(10)); @@ -127,7 +129,7 @@ mod tests { base_delay: Duration::from_secs(10), max_delay: Duration::from_secs(100), }; - let mut backoff = ExponentialBackoff::new(config.clone()); + let backoff = ExponentialBackoff::new(config.clone()); // jitter gets clipped to 0. assert_eq!(backoff.backoff_duration(), Duration::from_secs(10)); assert_eq!(backoff.backoff_duration(), Duration::from_secs(10)); @@ -141,7 +143,7 @@ mod tests { base_delay: Duration::from_secs(10), max_delay: Duration::from_secs(100), }; - let mut backoff = ExponentialBackoff::new(config.clone()); + let backoff = ExponentialBackoff::new(config.clone()); // jitter gets clipped to 1. // 0 <= duration <= 20. let duration = backoff.backoff_duration(); @@ -161,7 +163,7 @@ mod tests { base_delay: Duration::from_secs(1), max_delay: Duration::from_secs(15), }; - let mut backoff = ExponentialBackoff::new(config.clone()); + let backoff = ExponentialBackoff::new(config.clone()); assert_eq!(backoff.backoff_duration(), Duration::from_secs(1)); assert_eq!(backoff.backoff_duration(), Duration::from_secs(2)); assert_eq!(backoff.backoff_duration(), Duration::from_secs(4)); @@ -189,7 +191,7 @@ mod tests { base_delay: Duration::from_secs(1), max_delay: Duration::from_secs(15), }; - let mut backoff = ExponentialBackoff::new(config.clone()); + let backoff = ExponentialBackoff::new(config.clone()); // 0.8 <= duration <= 1.2. let duration = backoff.backoff_duration(); assert_eq!(duration.gt(&Duration::from_secs_f64(0.8 - EPSILON)), true); diff --git a/grpc/src/client/name_resolution/dns/mod.rs b/grpc/src/client/name_resolution/dns/mod.rs index 5d87f02e3..32d1f6efa 100644 --- a/grpc/src/client/name_resolution/dns/mod.rs +++ b/grpc/src/client/name_resolution/dns/mod.rs @@ -107,7 +107,7 @@ impl DnsResolver { tokio::sync::mpsc::unbounded_channel::>(); let handle = options.runtime.clone().spawn(Box::pin(async move { - let mut backoff = ExponentialBackoff::new(dns_opts.backoff_config.clone()); + let backoff = ExponentialBackoff::new(dns_opts.backoff_config.clone()); let state = state_copy; let work_scheduler = options.work_scheduler; let mut update_error_rx = update_error_rx; From 168e6423cb67e25f61cc749146e5ccdf2690fd48 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 20 May 2025 10:03:58 +0530 Subject: [PATCH 2/2] Resolver API changes --- grpc/src/client/name_resolution/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/grpc/src/client/name_resolution/mod.rs b/grpc/src/client/name_resolution/mod.rs index a4864b07f..bb1337dac 100644 --- a/grpc/src/client/name_resolution/mod.rs +++ b/grpc/src/client/name_resolution/mod.rs @@ -128,12 +128,12 @@ pub trait ResolverBuilder: Send + Sync { fn scheme(&self) -> &str; /// Returns the default authority for a channel using this name resolver - /// and target. This is typically the same as the service's name. By + /// and target. This is typically the same as the service's name. By /// default, the default_authority method automatically returns the path /// portion of the target URI, with the leading prefix removed. - fn default_authority<'a>(&self, uri: &'a Target) -> &'a str { + fn default_authority(&self, uri: &Target) -> String { let path = uri.path(); - path.strip_prefix("/").unwrap_or(path) + path.strip_prefix("/").unwrap_or(path).to_string() } /// Returns a bool indicating whether the input uri is valid to create a @@ -159,7 +159,7 @@ pub struct ResolverOptions { /// Used to asynchronously request a call into the Resolver's work method. pub trait WorkScheduler: Send + Sync { - // Schedules a call into the LbPolicy's work method. If there is already a + // Schedules a call into the Resolver's work method. If there is already a // pending work call that has not yet started, this may not schedule another // call. fn schedule_work(&self);