diff --git a/src/units/zone_server.rs b/src/units/zone_server.rs index e77a74bd..ac686009 100644 --- a/src/units/zone_server.rs +++ b/src/units/zone_server.rs @@ -46,8 +46,8 @@ use crate::manager::Terminated; use crate::manager::record_zone_event; use crate::util::AbortOnDrop; use crate::zone::{ - HistoricalEvent, SignedZoneVersionState, SigningTrigger, UnsignedZoneVersionState, ZoneHandle, - ZoneVersionReviewState, + HistoricalEvent, SignedZoneVersionState, SigningTrigger, UnsignedZoneVersionState, Zone, + ZoneHandle, ZoneVersionReviewState, }; /// The source of a zone server. @@ -367,7 +367,7 @@ impl ZoneServer { "[{unit_name}]: Cannot promote unsigned zone '{zone_name}' to the signable set of zones: {err}" ); } else { - self.on_unsigned_zone_approved(center, zone_name, zone_serial); + self.on_unsigned_zone_approved(center, &zone, zone_serial); } } Source::Signed => { @@ -526,21 +526,29 @@ impl ZoneServer { fn on_unsigned_zone_approved( &self, center: &Arc
, - zone_name: Name, + zone: &Arc, zone_serial: Serial, ) { - record_zone_event( - center, - &zone_name, - HistoricalEvent::UnsignedZoneReview { - status: crate::api::ZoneReviewStatus::Approved, - }, - Some(zone_serial), - ); + { + let mut zone_state = zone.state.lock().unwrap(); + zone_state.record_event( + HistoricalEvent::UnsignedZoneReview { + status: crate::api::ZoneReviewStatus::Approved, + }, + Some(zone_serial), + ); + ZoneHandle { + zone, + state: &mut zone_state, + center, + } + .storage() + .approve_loaded(); + } info!("[CC]: Instructing zone signer to sign the approved zone"); center.signer.on_sign_zone( center, - zone_name, + zone.name.clone(), Some(zone_serial), SigningTrigger::ZoneChangesApproved, ); @@ -637,16 +645,6 @@ impl ZoneServer { } version.review = new_review_state; - - if matches!(decision, ZoneReviewDecision::Approve) { - ZoneHandle { - zone: &zone, - state: &mut zone_state, - center, - } - .storage() - .approve_loaded(); - } } if matches!(decision, ZoneReviewDecision::Approve) { info!( @@ -654,7 +652,7 @@ impl ZoneServer { ); match Self::promote_zone_to_signable(center.clone(), &zone_name) { Ok(()) => { - self.on_unsigned_zone_approved(center, zone_name, zone_serial); + self.on_unsigned_zone_approved(center, &zone, zone_serial); } Err(err) => { error!( diff --git a/src/util.rs b/src/util.rs index 78f4ab08..1ce4797f 100644 --- a/src/util.rs +++ b/src/util.rs @@ -15,6 +15,7 @@ use tokio::{ task::{AbortHandle, JoinHandle}, time::Instant, }; +use tracing::{Instrument, trace}; //----------- AbortOnDrop ------------------------------------------------------ @@ -49,6 +50,71 @@ impl AbortOnDrop { } } +//----------- BackgroundTasks -------------------------------------------------- + +/// A monitor for background tasks. +/// +/// This allows tracking background tasks for a particular purpose. +/// +/// If a [`BackgroundTasks`] is dropped, all associated tasks are aborted. +#[derive(Default)] +pub struct BackgroundTasks { + /// The underlying tasks. + tasks: foldhash::HashMap>, +} + +impl BackgroundTasks { + /// Spawn a new background task. + pub fn spawn(&mut self, span: tracing::Span, f: F) + where + F: Future + Send + 'static, + { + let task = tokio::task::spawn(f.instrument(span)); + let other = self.tasks.insert(task.id(), task); + assert!( + other.is_none(), + "Task IDs for two live 'JoinHandles' never collide" + ); + } + + /// Spawn a new blocking background task. + pub fn spawn_blocking(&mut self, span: tracing::Span, f: F) + where + F: FnOnce() + Send + 'static, + { + let task = tokio::task::spawn_blocking(move || span.in_scope(f)); + let other = self.tasks.insert(task.id(), task); + assert!( + other.is_none(), + "Task IDs for two live 'JoinHandles' never collide" + ); + } + + /// Mark the running task as finished. + #[tracing::instrument(level = "trace", skip_all)] + pub fn finish(&mut self) { + let id = tokio::task::id(); + match self.tasks.remove(&id) { + Some(handle) => { + trace!("Detaching background task"); + std::mem::drop(handle); + } + None => { + trace!("Inconsistency: unknown task"); + } + } + } +} + +impl Drop for BackgroundTasks { + fn drop(&mut self) { + // Abort all tasks. + self.tasks.drain().for_each(|(_, handle)| handle.abort()); + } +} + +//------------------------------------------------------------------------------ + /// Force a [`Future`] to evaluate synchronously. pub fn force_future(future: F) -> F::Output { let waker = std::task::Waker::noop(); diff --git a/src/zone/storage.rs b/src/zone/storage.rs index 7b817729..b0b96855 100644 --- a/src/zone/storage.rs +++ b/src/zone/storage.rs @@ -15,7 +15,7 @@ use tracing::{info, trace, trace_span, warn}; use crate::{ center::Center, - util::{AbortOnDrop, force_future}, + util::{BackgroundTasks, force_future}, zone::{HistoricalEvent, PipelineMode, Zone, ZoneHandle, ZoneState}, }; @@ -58,6 +58,11 @@ impl StorageZoneHandle<'_> { /// /// If the zone data storage is busy, [`None`] is returned; the loader /// should enqueue the load operation and wait for an idle notification. + #[tracing::instrument( + level = "trace", + skip_all, + fields(zone = %self.zone.name), + )] pub fn start_load(&mut self) -> Option { // Examine the current state. let machine = &mut self.state.storage.machine; @@ -66,7 +71,8 @@ impl StorageZoneHandle<'_> { // The zone storage is passive; no other operations are ongoing, // and it is possible to begin building a new instance. trace!( - zone = %self.zone.name, + machine.old = "Passive", + machine.new = "Loading", "Obtaining a 'LoadedZoneBuilder' for performing a load" ); @@ -92,13 +98,19 @@ impl StorageZoneHandle<'_> { /// /// The prepared loaded instance of the zone is finalized, and passed on /// to the loaded zone reviewer. + #[tracing::instrument( + level = "trace", + skip_all, + fields(zone = %self.zone.name), + )] pub fn finish_load(&mut self, built: LoadedZoneBuilt) { // Examine the current state. let machine = &mut self.state.storage.machine; match machine.take() { ZoneDataStorage::Loading(s) => { trace!( - zone = %self.zone.name, + machine.old = "Loading", + machine.new = "ReviewLoadedPending", "Successfully finishing the ongoing load" ); @@ -126,13 +138,19 @@ impl StorageZoneHandle<'_> { /// /// Any intermediate artifacts will be cleaned up automatically, in the /// background. Once the zone storage is idle, a notification will be sent. + #[tracing::instrument( + level = "trace", + skip_all, + fields(zone = %self.zone.name), + )] pub fn give_up_load(&mut self, builder: LoadedZoneBuilder) { // Examine the current state. let machine = &mut self.state.storage.machine; match machine.take() { ZoneDataStorage::Loading(s) => { trace!( - zone = %self.zone.name, + machine.old = "Loading", + machine.new = "Cleaning", "Giving up on the ongoing load" ); @@ -151,14 +169,19 @@ impl StorageZoneHandle<'_> { /// # Loader Review Operations impl StorageZoneHandle<'_> { /// Initiate review of a new loaded instance of a zone. + #[tracing::instrument( + level = "trace", + skip_all, + fields(zone = %self.zone.name), + )] fn start_loaded_review(&mut self, loaded_reviewer: LoadedZoneReviewer) { // NOTE: This function provides compatibility with 'zonetree's. let zone = self.zone.clone(); let center = self.center.clone(); - let task = tokio::task::spawn_blocking(move || { - let span = trace_span!("start_loaded_review", zone = %zone.name); - let _guard = span.enter(); + let span = trace_span!("start_loaded_review"); + self.state.storage.background_tasks.spawn_blocking(span, move || { + trace!("Converting the loaded instance to 'zonetree'"); // Read the loaded instance. let reader = loaded_reviewer @@ -199,7 +222,11 @@ impl StorageZoneHandle<'_> { std::mem::replace(&mut state.storage.loaded_reviewer, loaded_reviewer); // Transition into the reviewing state. - tracing::debug!("Transitioning zone state..."); + trace!( + machine.old = "ReviewLoadedPending", + machine.new = "ReviewingLoaded", + "Initiating loaded review" + ); match state.storage.machine.take() { ZoneDataStorage::ReviewLoadedPending(s) => { let s = s.start(old_loaded_reviewer); @@ -226,24 +253,8 @@ impl StorageZoneHandle<'_> { state = zone.state.lock().unwrap(); } - // Clean up the background task. - // - // NOTE: The outer function is known to have finished by this - // point (due to the above zone state lock), and it will set - // 'background_task'. Thus, a race condition is impossible. - let task = state - .storage - .background_task - .take() - .expect("The background task 'task' has been set"); - assert_eq!( - task.id(), - tokio::task::id(), - "A different background task is registered" - ); + state.storage.background_tasks.finish(); }); - - self.state.storage.background_task = Some(task.into()); } /// Build a `zonetree` for an loaded instance of a zone. @@ -272,6 +283,11 @@ impl StorageZoneHandle<'_> { } /// Approve a loaded instance of a zone. + #[tracing::instrument( + level = "trace", + skip_all, + fields(zone = %self.zone.name), + )] pub fn approve_loaded(&mut self) { // Examine the current state. let machine = &mut self.state.storage.machine; @@ -283,10 +299,7 @@ impl StorageZoneHandle<'_> { "The loaded instance has been approved" ); - trace!( - zone = %self.zone.name, - "Persisting the loaded instance" - ); + trace!("Persisting the loaded instance"); let (s, persister) = s.mark_approved(); *machine = ZoneDataStorage::PersistingLoaded(s); @@ -304,10 +317,18 @@ impl StorageZoneHandle<'_> { /// /// A background task will be spawned to perform the provided zone cleaning /// and transition to the next state. + #[tracing::instrument( + level = "trace", + skip_all, + fields(zone = %self.zone.name), + )] fn start_cleanup(&mut self, cleaner: ZoneCleaner) { let zone = self.zone.clone(); let center = self.center.clone(); - let task = tokio::task::spawn_blocking(move || { + let span = trace_span!("clean"); + self.state.storage.background_tasks.spawn_blocking(span, move || { + trace!("Cleaning the zone"); + // Perform the cleaning. let cleaned = cleaner.clean(); @@ -322,6 +343,8 @@ impl StorageZoneHandle<'_> { state: &mut state, center: ¢er, }; + + trace!("Transitioning the state machine to 'Passive'"); let machine = &mut handle.state.storage.machine; match machine.take() { ZoneDataStorage::Cleaning(s) => { @@ -334,38 +357,29 @@ impl StorageZoneHandle<'_> { ), } - // Clean up the background task. - // - // NOTE: The outer function is known to have finished by this - // point (due to the above zone state lock), and it will set - // 'background_task'. Thus, a race condition is impossible. - let task = handle - .state - .storage - .background_task - .take() - .expect("The background task 'task' has been set"); - assert_eq!( - task.id(), - tokio::task::id(), - "A different background task is registered" - ); - // Notify the rest of Cascade that the storage is idle. handle.storage().on_idle(); - }); - self.state.storage.background_task = Some(task.into()); + handle.state.storage.background_tasks.finish(); + }); } /// Begin persisting a loaded zone instance. /// /// A background task will be spawned to perform the provided zone /// persistence and transition to the next state. + #[tracing::instrument( + level = "trace", + skip_all, + fields(zone = %self.zone.name), + )] fn start_loaded_persistence(&mut self, persister: LoadedZonePersister) { let zone = self.zone.clone(); let center = self.center.clone(); - let task = tokio::task::spawn_blocking(move || { + let span = trace_span!("persist_loaded"); + self.state.storage.background_tasks.spawn_blocking(span, move || { + trace!("Persisting the loaded instance"); + // Perform the persisting. let persisted = persister.persist(); @@ -379,28 +393,13 @@ impl StorageZoneHandle<'_> { center: ¢er, }; - // Clean up the background task. - // - // NOTE: The outer function is known to have finished by this - // point (due to the above zone state lock), and it will set - // 'background_task'. Thus, a race condition is impossible. - let task = handle - .state - .storage - .background_task - .take() - .expect("The background task 'task' has been set"); - assert_eq!( - task.id(), - tokio::task::id(), - "A different background task is registered" - ); - // Transition the state machine. + trace!("Finished persisting"); let machine = &mut handle.state.storage.machine; match machine.take() { ZoneDataStorage::PersistingLoaded(s) => { // For now, transition all the way back to 'Passive' state. + trace!("Transitioning the state machine to 'Cleaning'"); let (s, mut builder) = s.mark_complete(persisted); builder.clear(); let built = builder.finish().unwrap_or_else(|_| unreachable!()); @@ -424,9 +423,9 @@ impl StorageZoneHandle<'_> { // Notify the rest of Cascade that the storage is idle. handle.storage().on_idle(); - }); - self.state.storage.background_task = Some(task.into()); + handle.state.storage.background_tasks.finish(); + }); } /// Respond to the data storage idling. @@ -434,6 +433,11 @@ impl StorageZoneHandle<'_> { /// When the data storage idles, it is possible to initiate a new load or /// resigning of the zone. This method checks for enqueued loads or resigns /// and begins them appropriately. + #[tracing::instrument( + level = "trace", + skip_all, + fields(zone = %self.zone.name), + )] fn on_idle(&mut self) { // TODO: Check whether resigning is needed. It has higher priority than // loading a new instance. @@ -470,11 +474,11 @@ pub struct StorageState { // TODO: Move into the zone server unit. viewer: ZoneViewer, - /// An ongoing background task for the zone data. + /// Ongoing background tasks. /// /// When the zone data needs to be cleaned or persisted, a background task /// is automatically spawned and tracked here. - background_task: Option, + background_tasks: BackgroundTasks, } impl StorageState { @@ -487,7 +491,7 @@ impl StorageState { loaded_reviewer, signed_reviewer, viewer, - background_task: None, + background_tasks: Default::default(), } } }