Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions src/api/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,33 @@ mod tests {
number = "1.1.2.3",
text = "The provider mutator function MUST invoke the shutdown function on the previously registered provider once it's no longer being used to resolve flag values."
)]
#[test]
fn invoke_shutdown_on_old_provider_checked_by_type_system() {}
#[tokio::test]
async fn invoke_shutdown_on_old_provider() {
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{timeout, Duration};

let shutdown_notify = Arc::new(Notify::new());
let shutdown_notify_for_provider = shutdown_notify.clone();

let mut old_provider = MockFeatureProvider::new();
old_provider.expect_initialize().returning(|_| {});
old_provider
.expect_shutdown()
.returning(move || shutdown_notify_for_provider.notify_one())
.once();

let mut new_provider = MockFeatureProvider::new();
new_provider.expect_initialize().returning(|_| {});

let mut api = OpenFeature::default();
api.set_provider(old_provider).await;
api.set_provider(new_provider).await;

timeout(Duration::from_millis(200), shutdown_notify.notified())
.await
.expect("previous provider shutdown not invoked");
}

#[spec(
number = "1.1.3",
Expand Down Expand Up @@ -321,10 +346,28 @@ mod tests {
)]
#[tokio::test]
async fn shutdown() {
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{timeout, Duration};

let shutdown_notify = Arc::new(Notify::new());
let shutdown_notify_for_provider = shutdown_notify.clone();

let mut api = OpenFeature::default();
api.set_provider(NoOpProvider::default()).await;
let mut provider = MockFeatureProvider::new();
provider.expect_initialize().returning(|_| {});
provider
.expect_shutdown()
.returning(move || shutdown_notify_for_provider.notify_one())
.once();

api.set_provider(provider).await;

api.shutdown().await;

timeout(Duration::from_millis(200), shutdown_notify.notified())
.await
.expect("shutdown did not invoke provider shutdown");
}

#[spec(
Expand Down
68 changes: 60 additions & 8 deletions src/api/provider_registry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{borrow::Borrow, collections::HashMap};

use tokio::sync::RwLock;
use tokio::task::JoinHandle;

use crate::provider::{FeatureProvider, NoOpProvider};

Expand Down Expand Up @@ -32,20 +34,26 @@ impl ProviderRegistry {
}

pub async fn set_default<T: FeatureProvider>(&self, mut provider: T) {
let mut map = self.providers.write().await;
map.remove("");
let old_provider = self.providers.write().await.remove("");

if let Some(old_provider) = old_provider {
old_provider.shutdown_in_background();
}

provider
.initialize(self.global_evaluation_context.get().await.borrow())
.await;

map.insert(String::default(), FeatureProviderWrapper::new(provider));
self.providers
.write()
.await
.insert(String::default(), FeatureProviderWrapper::new(provider));
}

pub async fn set_named<T: FeatureProvider>(&self, name: &str, mut provider: T) {
// Drop the already registered provider if any.
if self.get_named(name).await.is_some() {
self.providers.write().await.remove(name);
if let Some(old_provider) = self.providers.write().await.remove(name) {
old_provider.shutdown_in_background();
}

provider
Expand Down Expand Up @@ -74,7 +82,21 @@ impl ProviderRegistry {
}

pub async fn clear(&self) {
let providers: Vec<FeatureProviderWrapper> =
self.providers.read().await.values().cloned().collect();

let mut shutdown_handles = Vec::with_capacity(providers.len());
for provider in providers {
if let Some(handle) = provider.shutdown_in_background() {
shutdown_handles.push(handle);
}
}

self.providers.write().await.clear();

for handle in shutdown_handles {
let _ = handle.await;
}
}
}

Expand All @@ -89,14 +111,44 @@ impl Default for ProviderRegistry {
// ============================================================

#[derive(Clone)]
pub struct FeatureProviderWrapper(Arc<dyn FeatureProvider>);
pub struct FeatureProviderWrapper(Arc<ProviderEntry>);

impl FeatureProviderWrapper {
pub fn new(provider: impl FeatureProvider) -> Self {
Self(Arc::new(provider))
Self(Arc::new(ProviderEntry::new(provider)))
}

pub fn get(&self) -> Arc<dyn FeatureProvider> {
self.0.clone()
self.0.provider.clone()
}

pub fn shutdown_in_background(&self) -> Option<JoinHandle<()>> {
if self
.0
.shutdown_started
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let provider = self.get();
Some(tokio::spawn(async move {
provider.shutdown().await;
}))
} else {
None
}
}
}

struct ProviderEntry {
provider: Arc<dyn FeatureProvider>,
shutdown_started: AtomicBool,
}

impl ProviderEntry {
fn new(provider: impl FeatureProvider) -> Self {
Self {
provider: Arc::new(provider),
shutdown_started: AtomicBool::new(false),
}
}
}
4 changes: 4 additions & 0 deletions src/provider/feature_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub trait FeatureProvider: Send + Sync + 'static {
ProviderStatus::Ready
}

/// The provider MAY define a mechanism to gracefully shutdown and dispose of resources.
#[allow(unused_variables)]
async fn shutdown(&self) {}

/// The provider interface MUST define a metadata member or accessor, containing a name field
/// or accessor of type string, which identifies the provider implementation.
fn metadata(&self) -> &ProviderMetadata;
Expand Down