From 66502c3fa9922c94ad6c875f534b9669a6ed17b3 Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Thu, 15 Jan 2026 16:50:55 +0100 Subject: [PATCH] fix: shutdown called on providers Signed-off-by: Fabrizio Demaria --- src/api/api.rs | 49 +++++++++++++++++++++-- src/api/provider_registry.rs | 68 ++++++++++++++++++++++++++++---- src/provider/feature_provider.rs | 4 ++ 3 files changed, 110 insertions(+), 11 deletions(-) diff --git a/src/api/api.rs b/src/api/api.rs index 3344be7..0c25bdc 100644 --- a/src/api/api.rs +++ b/src/api/api.rs @@ -199,8 +199,33 @@ mod tests { number = "1.1.2.3", text = "The provider mutator function MUST invoke the shutdown function on the previously registered provider once it's no longer being used to resolve flag values." )] - #[test] - fn invoke_shutdown_on_old_provider_checked_by_type_system() {} + #[tokio::test] + async fn invoke_shutdown_on_old_provider() { + use std::sync::Arc; + use tokio::sync::Notify; + use tokio::time::{timeout, Duration}; + + let shutdown_notify = Arc::new(Notify::new()); + let shutdown_notify_for_provider = shutdown_notify.clone(); + + let mut old_provider = MockFeatureProvider::new(); + old_provider.expect_initialize().returning(|_| {}); + old_provider + .expect_shutdown() + .returning(move || shutdown_notify_for_provider.notify_one()) + .once(); + + let mut new_provider = MockFeatureProvider::new(); + new_provider.expect_initialize().returning(|_| {}); + + let mut api = OpenFeature::default(); + api.set_provider(old_provider).await; + api.set_provider(new_provider).await; + + timeout(Duration::from_millis(200), shutdown_notify.notified()) + .await + .expect("previous provider shutdown not invoked"); + } #[spec( number = "1.1.3", @@ -321,10 +346,28 @@ mod tests { )] #[tokio::test] async fn shutdown() { + use std::sync::Arc; + use tokio::sync::Notify; + use tokio::time::{timeout, Duration}; + + let shutdown_notify = Arc::new(Notify::new()); + let shutdown_notify_for_provider = shutdown_notify.clone(); + let mut api = OpenFeature::default(); - api.set_provider(NoOpProvider::default()).await; + let mut provider = MockFeatureProvider::new(); + provider.expect_initialize().returning(|_| {}); + provider + .expect_shutdown() + .returning(move || shutdown_notify_for_provider.notify_one()) + .once(); + + api.set_provider(provider).await; api.shutdown().await; + + timeout(Duration::from_millis(200), shutdown_notify.notified()) + .await + .expect("shutdown did not invoke provider shutdown"); } #[spec( diff --git a/src/api/provider_registry.rs b/src/api/provider_registry.rs index 753dd83..f59f9de 100644 --- a/src/api/provider_registry.rs +++ b/src/api/provider_registry.rs @@ -1,7 +1,9 @@ +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{borrow::Borrow, collections::HashMap}; use tokio::sync::RwLock; +use tokio::task::JoinHandle; use crate::provider::{FeatureProvider, NoOpProvider}; @@ -32,20 +34,26 @@ impl ProviderRegistry { } pub async fn set_default(&self, mut provider: T) { - let mut map = self.providers.write().await; - map.remove(""); + let old_provider = self.providers.write().await.remove(""); + + if let Some(old_provider) = old_provider { + old_provider.shutdown_in_background(); + } provider .initialize(self.global_evaluation_context.get().await.borrow()) .await; - map.insert(String::default(), FeatureProviderWrapper::new(provider)); + self.providers + .write() + .await + .insert(String::default(), FeatureProviderWrapper::new(provider)); } pub async fn set_named(&self, name: &str, mut provider: T) { // Drop the already registered provider if any. - if self.get_named(name).await.is_some() { - self.providers.write().await.remove(name); + if let Some(old_provider) = self.providers.write().await.remove(name) { + old_provider.shutdown_in_background(); } provider @@ -74,7 +82,21 @@ impl ProviderRegistry { } pub async fn clear(&self) { + let providers: Vec = + self.providers.read().await.values().cloned().collect(); + + let mut shutdown_handles = Vec::with_capacity(providers.len()); + for provider in providers { + if let Some(handle) = provider.shutdown_in_background() { + shutdown_handles.push(handle); + } + } + self.providers.write().await.clear(); + + for handle in shutdown_handles { + let _ = handle.await; + } } } @@ -89,14 +111,44 @@ impl Default for ProviderRegistry { // ============================================================ #[derive(Clone)] -pub struct FeatureProviderWrapper(Arc); +pub struct FeatureProviderWrapper(Arc); impl FeatureProviderWrapper { pub fn new(provider: impl FeatureProvider) -> Self { - Self(Arc::new(provider)) + Self(Arc::new(ProviderEntry::new(provider))) } pub fn get(&self) -> Arc { - self.0.clone() + self.0.provider.clone() + } + + pub fn shutdown_in_background(&self) -> Option> { + if self + .0 + .shutdown_started + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + let provider = self.get(); + Some(tokio::spawn(async move { + provider.shutdown().await; + })) + } else { + None + } + } +} + +struct ProviderEntry { + provider: Arc, + shutdown_started: AtomicBool, +} + +impl ProviderEntry { + fn new(provider: impl FeatureProvider) -> Self { + Self { + provider: Arc::new(provider), + shutdown_started: AtomicBool::new(false), + } } } diff --git a/src/provider/feature_provider.rs b/src/provider/feature_provider.rs index ecf2261..b303c22 100644 --- a/src/provider/feature_provider.rs +++ b/src/provider/feature_provider.rs @@ -45,6 +45,10 @@ pub trait FeatureProvider: Send + Sync + 'static { ProviderStatus::Ready } + /// The provider MAY define a mechanism to gracefully shutdown and dispose of resources. + #[allow(unused_variables)] + async fn shutdown(&self) {} + /// The provider interface MUST define a metadata member or accessor, containing a name field /// or accessor of type string, which identifies the provider implementation. fn metadata(&self) -> &ProviderMetadata;