From b5ceed2e5233f1d0abbaf4308c178e178d97ee83 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 5 Jun 2025 14:51:09 -0700 Subject: [PATCH 1/2] old child manager impl --- .../client/load_balancing/child_manager.rs | 255 ++++++++---------- 1 file changed, 111 insertions(+), 144 deletions(-) diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs index ca03b691e..10cdc11ef 100644 --- a/grpc/src/client/load_balancing/child_manager.rs +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -23,41 +23,32 @@ // policy in use. Complete tests must be written before it can be used in // production. Also, support for the work scheduler is missing. -use std::collections::HashSet; -use std::sync::Mutex; use std::{collections::HashMap, error::Error, hash::Hash, mem, sync::Arc}; use crate::client::load_balancing::{ - ChannelController, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState, WeakSubchannel, - WorkScheduler, + ChannelController, LbConfig, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState, WorkScheduler, }; use crate::client::name_resolution::{Address, ResolverUpdate}; -use crate::client::service_config::LbConfig; use super::{Subchannel, SubchannelState}; -use tokio::sync::{mpsc, watch, Notify}; -use tokio::task::{AbortHandle, JoinHandle}; - // An LbPolicy implementation that manages multiple children. pub struct ChildManager { - subchannels: HashMap>, - children: HashMap, Child>, - sharder: Box>, - updated: bool, // true iff a child has updated its state since the last call to has_updated. - work_requests: Arc>>>, - work_scheduler: Arc, + subchannel_child_map: HashMap, + children: Vec>, + shard_update: Box>, } -pub trait ChildIdentifier: PartialEq + Hash + Eq + Send + Sync + 'static {} - -struct Child { +struct Child { + identifier: T, policy: Box, state: LbState, } /// A collection of data sent to a child of the ChildManager. -pub struct ChildUpdate { +pub struct ChildUpdate { + /// The identifier the ChildManager should use for this child. + pub child_identifier: T, /// The builder the ChildManager should use to create this child if it does /// not exist. pub child_policy_builder: Box, @@ -65,31 +56,23 @@ pub struct ChildUpdate { pub child_update: ResolverUpdate, } -pub trait ResolverUpdateSharder: Send { - /// Performs the operation of sharding an aggregate ResolverUpdate into one - /// or more ChildUpdates. Called automatically by the ChildManager when its - /// resolver_update method is called. The key in the returned map is the - /// identifier the ChildManager should use for this child. - fn shard_update( - &self, - resolver_update: ResolverUpdate, - ) -> Result, Box>; -} +// TODO: convert to a trait? +/// Performs the operation of sharding an aggregate ResolverUpdate into one or +/// more ChildUpdates. Called automatically by the ChildManager when its +/// resolver_update method is called. +pub type ResolverUpdateSharder = + fn( + ResolverUpdate, + ) -> Result>>, Box>; -impl ChildManager { +impl ChildManager { /// Creates a new ChildManager LB policy. shard_update is called whenever a /// resolver_update operation occurs. - pub fn new( - work_scheduler: Arc, - sharder: Box>, - ) -> Self { - ChildManager { - subchannels: HashMap::default(), - children: HashMap::default(), - sharder, - updated: false, - work_requests: Arc::default(), - work_scheduler, + pub fn new(shard_update: Box>) -> Self { + Self { + subchannel_child_map: HashMap::default(), + children: Vec::default(), + shard_update, } } @@ -97,16 +80,12 @@ impl ChildManager { pub fn child_states(&mut self) -> impl Iterator { self.children .iter() - .map(|(id, child)| (id.as_ref(), &child.state)) - } - - pub fn has_updated(&mut self) -> bool { - mem::take(&mut self.updated) + .map(|child| (&child.identifier, &child.state)) } // Called to update all accounting in the ChildManager from operations // performed by a child policy on the WrappedController that was created for - // it. + // it. child_idx is an index into the children map for the relevant child. // // TODO: this post-processing step can be eliminated by capturing the right // state inside the WrappedController, however it is fairly complex. Decide @@ -114,33 +93,20 @@ impl ChildManager { fn resolve_child_controller( &mut self, channel_controller: WrappedController, - child_id: Arc, + child_idx: usize, ) { // Add all created subchannels into the subchannel_child_map. for csc in channel_controller.created_subchannels { - self.subchannels - .insert(WeakSubchannel::new(csc), child_id.clone()); + self.subchannel_child_map.insert(csc, child_idx); } // Update the tracked state if the child produced an update. if let Some(state) = channel_controller.picker_update { - self.children.get_mut(&child_id.clone()).unwrap().state = state; - self.updated = true; + self.children[child_idx].state = state; }; - // Prune subchannels created by this child that are no longer - // referenced. - self.subchannels.retain(|sc, cid| { - if cid != &child_id { - return true; - } - if sc.upgrade().is_none() { - return false; - } - true - }); } } -impl LbPolicy for ChildManager { +impl LbPolicy for ChildManager { fn resolver_update( &mut self, resolver_update: ResolverUpdate, @@ -148,90 +114,110 @@ impl LbPolicy for ChildManager { channel_controller: &mut dyn ChannelController, ) -> Result<(), Box> { // First determine if the incoming update is valid. - let child_updates = self.sharder.shard_update(resolver_update)?; + let child_updates = (self.shard_update)(resolver_update)?; - // Remove children that are no longer active. - self.children - .retain(|child_id, _| child_updates.contains_key(child_id)); + // Replace self.children with an empty vec. + let mut old_children = vec![]; + mem::swap(&mut self.children, &mut old_children); + + // Replace the subchannel map with an empty map. + let mut old_subchannel_child_map = HashMap::new(); + mem::swap( + &mut self.subchannel_child_map, + &mut old_subchannel_child_map, + ); + // Reverse the old subchannel map. + let mut old_child_subchannels_map: HashMap> = HashMap::new(); + for (subchannel, child_idx) in old_subchannel_child_map { + old_child_subchannels_map + .entry(child_idx) + .or_default() + .push(subchannel); + } + + // Build a map of the old children from their IDs for efficient lookups. + let old_children = old_children + .into_iter() + .enumerate() + .map(|(old_idx, e)| (e.identifier, (e.policy, e.state, old_idx))); + let mut old_children: HashMap = old_children.collect(); + + // Split the child updates into the IDs and builders, and the + // ResolverUpdates. + let (ids_builders, updates): (Vec<_>, Vec<_>) = child_updates + .map(|e| ((e.child_identifier, e.child_policy_builder), e.child_update)) + .unzip(); - // Apply child updates to respective policies, instantiating new ones as - // needed. - for (id, update) in child_updates.into_iter() { - let child_id: Arc = Arc::new(id); - let child_policy: &mut dyn LbPolicy = match self.children.get_mut(&child_id) { - Some(child) => child.policy.as_mut(), - None => { - self.children.insert( - child_id.clone(), - Child { - policy: update.child_policy_builder.build(LbPolicyOptions { - work_scheduler: Arc::new(ChildScheduler::new( - child_id.clone(), - self.work_scheduler.clone(), - self.work_requests.clone(), - )), - }), - state: LbState::initial(), - }, - ); - self.children.get_mut(&child_id).unwrap().policy.as_mut() + // Transfer children whose identifiers appear before and after the + // update, and create new children. Add entries back into the + // subchannel map. + for (new_idx, (identifier, builder)) in ids_builders.into_iter().enumerate() { + if let Some((policy, state, old_idx)) = old_children.remove(&identifier) { + for subchannel in old_child_subchannels_map + .remove(&old_idx) + .into_iter() + .flatten() + { + self.subchannel_child_map.insert(subchannel, new_idx); } + self.children.push(Child { + identifier, + state, + policy, + }); + } else { + let policy = builder.build(LbPolicyOptions { + work_scheduler: Arc::new(UnimplWorkScheduler {}), + }); + let state = LbState::initial(); + self.children.push(Child { + identifier, + state, + policy, + }); }; - let mut channel_controller = WrappedController::new(channel_controller); - let _ = child_policy.resolver_update( - update.child_update.clone(), - config, - &mut channel_controller, - ); - self.resolve_child_controller(channel_controller, child_id.clone()); } - // Keep only the subchannels associated with currently active children. - self.subchannels - .retain(|_, child_id| self.children.contains_key(child_id)); + // Anything left in old_children will just be Dropped and cleaned up. + + // Call resolver_update on all children. + let mut updates = updates.into_iter(); + for child_idx in 0..self.children.len() { + let child = &mut self.children[child_idx]; + let child_update = updates.next().unwrap(); + let mut channel_controller = WrappedController::new(channel_controller); + let _ = child + .policy + .resolver_update(child_update, config, &mut channel_controller); + self.resolve_child_controller(channel_controller, child_idx); + } Ok(()) } fn subchannel_update( &mut self, - subchannel: Arc, + subchannel: &Subchannel, state: &SubchannelState, channel_controller: &mut dyn ChannelController, ) { // Determine which child created this subchannel. - let child_id = self - .subchannels - .get(&WeakSubchannel::new(subchannel.clone())) - .unwrap_or_else(|| { - panic!("Subchannel not found in child manager: {}", subchannel); - }); - let policy = &mut self.children.get_mut(&child_id.clone()).unwrap().policy; - + let child_idx = *self.subchannel_child_map.get(subchannel).unwrap(); + let policy = &mut self.children[child_idx].policy; // Wrap the channel_controller to track the child's operations. let mut channel_controller = WrappedController::new(channel_controller); // Call the proper child. policy.subchannel_update(subchannel, state, &mut channel_controller); - self.resolve_child_controller(channel_controller, child_id.clone()); + self.resolve_child_controller(channel_controller, child_idx); } - fn work(&mut self, channel_controller: &mut dyn ChannelController) { - let children = mem::take(&mut *self.work_requests.lock().unwrap()); - // It is possible that work was queued for a child that got removed as - // part of a subsequent resolver_update. So, it is safe to ignore such a - // child here. - for child_id in children { - if let Some(child) = self.children.get_mut(&child_id) { - let mut channel_controller = WrappedController::new(channel_controller); - child.policy.work(&mut channel_controller); - self.resolve_child_controller(channel_controller, child_id.clone()); - } - } + fn work(&mut self, _channel_controller: &mut dyn ChannelController) { + todo!(); } } struct WrappedController<'a> { channel_controller: &'a mut dyn ChannelController, - created_subchannels: Vec>, + created_subchannels: Vec, picker_update: Option, } @@ -246,7 +232,7 @@ impl<'a> WrappedController<'a> { } impl ChannelController for WrappedController<'_> { - fn new_subchannel(&mut self, address: &Address) -> Arc { + fn new_subchannel(&mut self, address: &Address) -> Subchannel { let subchannel = self.channel_controller.new_subchannel(address); self.created_subchannels.push(subchannel.clone()); subchannel @@ -261,29 +247,10 @@ impl ChannelController for WrappedController<'_> { } } -struct ChildScheduler { - child_identifier: Arc, - work_requests: Arc>>>, - work_scheduler: Arc, -} - -impl ChildScheduler { - fn new( - child_identifier: Arc, - work_scheduler: Arc, - work_requests: Arc>>>, - ) -> Self { - Self { - child_identifier, - work_requests, - work_scheduler, - } - } -} +pub struct UnimplWorkScheduler; -impl WorkScheduler for ChildScheduler { +impl WorkScheduler for UnimplWorkScheduler { fn schedule_work(&self) { - (*self.work_requests.lock().unwrap()).insert(self.child_identifier.clone()); - self.work_scheduler.schedule_work(); + todo!(); } } From 89a80ff6d9988a3c5153f703bd5be331c6c8adb1 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 5 Jun 2025 14:51:27 -0700 Subject: [PATCH 2/2] revert to old design and implement pending work with hash set of indices --- .../client/load_balancing/child_manager.rs | 127 ++++++++++++------ grpc/src/client/load_balancing/mod.rs | 10 +- 2 files changed, 97 insertions(+), 40 deletions(-) diff --git a/grpc/src/client/load_balancing/child_manager.rs b/grpc/src/client/load_balancing/child_manager.rs index 10cdc11ef..2bf0d0473 100644 --- a/grpc/src/client/load_balancing/child_manager.rs +++ b/grpc/src/client/load_balancing/child_manager.rs @@ -23,10 +23,13 @@ // policy in use. Complete tests must be written before it can be used in // production. Also, support for the work scheduler is missing. +use std::collections::HashSet; +use std::sync::Mutex; use std::{collections::HashMap, error::Error, hash::Hash, mem, sync::Arc}; use crate::client::load_balancing::{ - ChannelController, LbConfig, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState, WorkScheduler, + ChannelController, LbConfig, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState, + WeakSubchannel, WorkScheduler, }; use crate::client::name_resolution::{Address, ResolverUpdate}; @@ -34,15 +37,19 @@ use super::{Subchannel, SubchannelState}; // An LbPolicy implementation that manages multiple children. pub struct ChildManager { - subchannel_child_map: HashMap, + subchannel_child_map: HashMap, children: Vec>, - shard_update: Box>, + update_sharder: Box>, + pending_work: Arc>>, } +pub trait ChildIdentifier: PartialEq + Hash + Eq + Send + Sync + 'static {} + struct Child { identifier: T, policy: Box, state: LbState, + work_scheduler: Arc, } /// A collection of data sent to a child of the ChildManager. @@ -56,23 +63,26 @@ pub struct ChildUpdate { pub child_update: ResolverUpdate, } -// TODO: convert to a trait? -/// Performs the operation of sharding an aggregate ResolverUpdate into one or -/// more ChildUpdates. Called automatically by the ChildManager when its -/// resolver_update method is called. -pub type ResolverUpdateSharder = - fn( - ResolverUpdate, +pub trait ResolverUpdateSharder: Send { + /// Performs the operation of sharding an aggregate ResolverUpdate into one + /// or more ChildUpdates. Called automatically by the ChildManager when its + /// resolver_update method is called. The key in the returned map is the + /// identifier the ChildManager should use for this child. + fn shard_update( + &self, + resolver_update: ResolverUpdate, ) -> Result>>, Box>; +} -impl ChildManager { +impl ChildManager { /// Creates a new ChildManager LB policy. shard_update is called whenever a /// resolver_update operation occurs. - pub fn new(shard_update: Box>) -> Self { + pub fn new(update_sharder: Box>) -> Self { Self { - subchannel_child_map: HashMap::default(), - children: Vec::default(), - shard_update, + update_sharder, + subchannel_child_map: Default::default(), + children: Default::default(), + pending_work: Default::default(), } } @@ -97,7 +107,7 @@ impl ChildManager { ) { // Add all created subchannels into the subchannel_child_map. for csc in channel_controller.created_subchannels { - self.subchannel_child_map.insert(csc, child_idx); + self.subchannel_child_map.insert(csc.into(), child_idx); } // Update the tracked state if the child produced an update. if let Some(state) = channel_controller.picker_update { @@ -106,7 +116,7 @@ impl ChildManager { } } -impl LbPolicy for ChildManager { +impl LbPolicy for ChildManager { fn resolver_update( &mut self, resolver_update: ResolverUpdate, @@ -114,20 +124,25 @@ impl LbPolicy for ChildManager { channel_controller: &mut dyn ChannelController, ) -> Result<(), Box> { // First determine if the incoming update is valid. - let child_updates = (self.shard_update)(resolver_update)?; + let child_updates = self.update_sharder.shard_update(resolver_update)?; + + // Hold the lock to prevent new work requests during this operation and + // rewrite the indices. + let mut pending_work = self.pending_work.lock().unwrap(); + + // Reset pending work; we will re-add any entries it contains with the + // right index later. + let old_pending_work = mem::take(&mut *pending_work); // Replace self.children with an empty vec. - let mut old_children = vec![]; - mem::swap(&mut self.children, &mut old_children); + let old_children = mem::take(&mut self.children); // Replace the subchannel map with an empty map. - let mut old_subchannel_child_map = HashMap::new(); - mem::swap( - &mut self.subchannel_child_map, - &mut old_subchannel_child_map, - ); + let old_subchannel_child_map = mem::take(&mut self.subchannel_child_map); + // Reverse the old subchannel map. - let mut old_child_subchannels_map: HashMap> = HashMap::new(); + let mut old_child_subchannels_map: HashMap> = HashMap::new(); + for (subchannel, child_idx) in old_subchannel_child_map { old_child_subchannels_map .entry(child_idx) @@ -139,7 +154,7 @@ impl LbPolicy for ChildManager { let old_children = old_children .into_iter() .enumerate() - .map(|(old_idx, e)| (e.identifier, (e.policy, e.state, old_idx))); + .map(|(old_idx, e)| (e.identifier, (e.policy, e.state, old_idx, e.work_scheduler))); let mut old_children: HashMap = old_children.collect(); // Split the child updates into the IDs and builders, and the @@ -152,7 +167,8 @@ impl LbPolicy for ChildManager { // update, and create new children. Add entries back into the // subchannel map. for (new_idx, (identifier, builder)) in ids_builders.into_iter().enumerate() { - if let Some((policy, state, old_idx)) = old_children.remove(&identifier) { + if let Some((policy, state, old_idx, work_scheduler)) = old_children.remove(&identifier) + { for subchannel in old_child_subchannels_map .remove(&old_idx) .into_iter() @@ -160,24 +176,43 @@ impl LbPolicy for ChildManager { { self.subchannel_child_map.insert(subchannel, new_idx); } + if old_pending_work.contains(&old_idx) { + pending_work.insert(new_idx); + } + *work_scheduler.idx.lock().unwrap() = Some(new_idx); self.children.push(Child { identifier, state, policy, + work_scheduler, }); } else { + let work_scheduler = Arc::new(ChildWorkScheduler { + pending_work: self.pending_work.clone(), + idx: Mutex::new(Some(new_idx)), + }); let policy = builder.build(LbPolicyOptions { - work_scheduler: Arc::new(UnimplWorkScheduler {}), + work_scheduler: work_scheduler.clone(), }); let state = LbState::initial(); self.children.push(Child { identifier, state, policy, + work_scheduler, }); }; } + // Invalidate all deleted children's work_schedulers. + for (_, (_, _, _, work_scheduler)) in old_children { + *work_scheduler.idx.lock().unwrap() = None; + } + + // Release the pending_work mutex before calling into the children to + // allow their work scheduler calls to unblock. + drop(pending_work); + // Anything left in old_children will just be Dropped and cleaned up. // Call resolver_update on all children. @@ -196,12 +231,15 @@ impl LbPolicy for ChildManager { fn subchannel_update( &mut self, - subchannel: &Subchannel, + subchannel: Arc, state: &SubchannelState, channel_controller: &mut dyn ChannelController, ) { // Determine which child created this subchannel. - let child_idx = *self.subchannel_child_map.get(subchannel).unwrap(); + let child_idx = *self + .subchannel_child_map + .get(&WeakSubchannel::new(&subchannel)) + .unwrap(); let policy = &mut self.children[child_idx].policy; // Wrap the channel_controller to track the child's operations. let mut channel_controller = WrappedController::new(channel_controller); @@ -210,14 +248,21 @@ impl LbPolicy for ChildManager { self.resolve_child_controller(channel_controller, child_idx); } - fn work(&mut self, _channel_controller: &mut dyn ChannelController) { - todo!(); + fn work(&mut self, channel_controller: &mut dyn ChannelController) { + let child_idxes = mem::take(&mut *self.pending_work.lock().unwrap()); + for child_idx in child_idxes { + let mut channel_controller = WrappedController::new(channel_controller); + self.children[child_idx] + .policy + .work(&mut channel_controller); + self.resolve_child_controller(channel_controller, child_idx); + } } } struct WrappedController<'a> { channel_controller: &'a mut dyn ChannelController, - created_subchannels: Vec, + created_subchannels: Vec>, picker_update: Option, } @@ -232,7 +277,7 @@ impl<'a> WrappedController<'a> { } impl ChannelController for WrappedController<'_> { - fn new_subchannel(&mut self, address: &Address) -> Subchannel { + fn new_subchannel(&mut self, address: &Address) -> Arc { let subchannel = self.channel_controller.new_subchannel(address); self.created_subchannels.push(subchannel.clone()); subchannel @@ -247,10 +292,16 @@ impl ChannelController for WrappedController<'_> { } } -pub struct UnimplWorkScheduler; +struct ChildWorkScheduler { + pending_work: Arc>>, // Must be taken first for correctness + idx: Mutex>, // None if the child is deleted. +} -impl WorkScheduler for UnimplWorkScheduler { +impl WorkScheduler for ChildWorkScheduler { fn schedule_work(&self) { - todo!(); + let mut pending_work = self.pending_work.lock().unwrap(); + if let Some(idx) = *self.idx.lock().unwrap() { + pending_work.insert(idx); + } } } diff --git a/grpc/src/client/load_balancing/mod.rs b/grpc/src/client/load_balancing/mod.rs index 3fe57cc0c..fa9e32767 100644 --- a/grpc/src/client/load_balancing/mod.rs +++ b/grpc/src/client/load_balancing/mod.rs @@ -403,10 +403,16 @@ impl Display for dyn Subchannel { struct WeakSubchannel(Weak); -impl WeakSubchannel { - pub fn new(subchannel: Arc) -> Self { +impl From> for WeakSubchannel { + fn from(subchannel: Arc) -> Self { WeakSubchannel(Arc::downgrade(&subchannel)) } +} + +impl WeakSubchannel { + pub fn new(subchannel: &Arc) -> Self { + WeakSubchannel(Arc::downgrade(subchannel)) + } pub fn upgrade(&self) -> Option> { self.0.upgrade()