Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions grpc/src/client/name_resolution/backoff.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use rand::Rng;
use std::time::Duration;
use std::{sync::Mutex, time::Duration};

/// TODO(arjan-bal): Move this
#[derive(Clone)]
Expand All @@ -23,7 +23,7 @@ pub struct ExponentialBackoff {

/// The delay for the next retry, without the random jitter. Store as f64
/// to avoid rounding errors.
next_delay_secs: f64,
next_delay_secs: Mutex<f64>,
}

/// This is a backoff configuration with the default values specified
Expand Down Expand Up @@ -51,23 +51,25 @@ impl ExponentialBackoff {
let next_delay_secs = config.base_delay.as_secs_f64();
ExponentialBackoff {
config,
next_delay_secs,
next_delay_secs: Mutex::new(next_delay_secs),
}
}

pub fn reset(&mut self) {
self.next_delay_secs = self.config.base_delay.as_secs_f64();
pub fn reset(&self) {
let mut next_delay = self.next_delay_secs.lock().unwrap();
*next_delay = self.config.base_delay.as_secs_f64();
}

pub fn backoff_duration(&mut self) -> Duration {
let ret = self.next_delay_secs
* (1.0 + self.config.jitter * rand::thread_rng().gen_range(-1.0..1.0));
self.next_delay_secs = self
pub fn backoff_duration(&self) -> Duration {
let mut next_delay = self.next_delay_secs.lock().unwrap();
let cur_delay =
*next_delay * (1.0 + self.config.jitter * rand::thread_rng().gen_range(-1.0..1.0));
*next_delay = self
.config
.max_delay
.as_secs_f64()
.min(self.next_delay_secs * self.config.multiplier);
Duration::from_secs_f64(ret)
.min(*next_delay * self.config.multiplier);
Duration::from_secs_f64(cur_delay)
}
}

Expand All @@ -89,7 +91,7 @@ mod tests {
jitter: 0.0,
max_delay: Duration::from_secs(100),
};
let mut backoff = ExponentialBackoff::new(config.clone());
let backoff = ExponentialBackoff::new(config.clone());
assert_eq!(backoff.backoff_duration(), Duration::from_secs(10));
}

Expand All @@ -101,7 +103,7 @@ mod tests {
base_delay: Duration::from_secs(100),
max_delay: Duration::from_secs(10),
};
let mut backoff = ExponentialBackoff::new(config.clone());
let backoff = ExponentialBackoff::new(config.clone());
assert_eq!(backoff.backoff_duration(), Duration::from_secs(10));
}

Expand All @@ -113,7 +115,7 @@ mod tests {
base_delay: Duration::from_secs(10),
max_delay: Duration::from_secs(100),
};
let mut backoff = ExponentialBackoff::new(config.clone());
let backoff = ExponentialBackoff::new(config.clone());
// multiplier gets clipped to 1.
assert_eq!(backoff.backoff_duration(), Duration::from_secs(10));
assert_eq!(backoff.backoff_duration(), Duration::from_secs(10));
Expand All @@ -127,7 +129,7 @@ mod tests {
base_delay: Duration::from_secs(10),
max_delay: Duration::from_secs(100),
};
let mut backoff = ExponentialBackoff::new(config.clone());
let backoff = ExponentialBackoff::new(config.clone());
// jitter gets clipped to 0.
assert_eq!(backoff.backoff_duration(), Duration::from_secs(10));
assert_eq!(backoff.backoff_duration(), Duration::from_secs(10));
Expand All @@ -141,7 +143,7 @@ mod tests {
base_delay: Duration::from_secs(10),
max_delay: Duration::from_secs(100),
};
let mut backoff = ExponentialBackoff::new(config.clone());
let backoff = ExponentialBackoff::new(config.clone());
// jitter gets clipped to 1.
// 0 <= duration <= 20.
let duration = backoff.backoff_duration();
Expand All @@ -161,7 +163,7 @@ mod tests {
base_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(15),
};
let mut backoff = ExponentialBackoff::new(config.clone());
let backoff = ExponentialBackoff::new(config.clone());
assert_eq!(backoff.backoff_duration(), Duration::from_secs(1));
assert_eq!(backoff.backoff_duration(), Duration::from_secs(2));
assert_eq!(backoff.backoff_duration(), Duration::from_secs(4));
Expand Down Expand Up @@ -189,7 +191,7 @@ mod tests {
base_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(15),
};
let mut backoff = ExponentialBackoff::new(config.clone());
let backoff = ExponentialBackoff::new(config.clone());
// 0.8 <= duration <= 1.2.
let duration = backoff.backoff_duration();
assert_eq!(duration.gt(&Duration::from_secs_f64(0.8 - EPSILON)), true);
Expand Down
2 changes: 1 addition & 1 deletion grpc/src/client/name_resolution/dns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl DnsResolver {
tokio::sync::mpsc::unbounded_channel::<Result<(), String>>();

let handle = options.runtime.clone().spawn(Box::pin(async move {
let mut backoff = ExponentialBackoff::new(dns_opts.backoff_config.clone());
let backoff = ExponentialBackoff::new(dns_opts.backoff_config.clone());
let state = state_copy;
let work_scheduler = options.work_scheduler;
let mut update_error_rx = update_error_rx;
Expand Down
8 changes: 4 additions & 4 deletions grpc/src/client/name_resolution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ pub trait ResolverBuilder: Send + Sync {
fn scheme(&self) -> &str;

/// Returns the default authority for a channel using this name resolver
/// and target. This is typically the same as the service's name. By
/// and target. This is typically the same as the service's name. By
/// default, the default_authority method automatically returns the path
/// portion of the target URI, with the leading prefix removed.
fn default_authority<'a>(&self, uri: &'a Target) -> &'a str {
fn default_authority(&self, uri: &Target) -> String {
let path = uri.path();
path.strip_prefix("/").unwrap_or(path)
path.strip_prefix("/").unwrap_or(path).to_string()
}

/// Returns a bool indicating whether the input uri is valid to create a
Expand All @@ -159,7 +159,7 @@ pub struct ResolverOptions {

/// Used to asynchronously request a call into the Resolver's work method.
pub trait WorkScheduler: Send + Sync {
// Schedules a call into the LbPolicy's work method. If there is already a
// Schedules a call into the Resolver's work method. If there is already a
// pending work call that has not yet started, this may not schedule another
// call.
fn schedule_work(&self);
Expand Down
Loading