From 9a6c7f26edff5f526cbd55fbbc48b82796781983 Mon Sep 17 00:00:00 2001 From: sanaz Date: Fri, 12 Sep 2025 09:57:24 -0700 Subject: [PATCH 01/17] simplifies the context API --- Cargo.toml | 4 +- src/core/context/examples.rs | 206 ++++++++++++++++++++++++++++++ src/core/context/mod.rs | 240 +++++++++++++++++++++++++++++++++++ src/core/mod.rs | 2 + src/node/base_node.rs | 29 ++++- 5 files changed, 473 insertions(+), 8 deletions(-) create mode 100644 src/core/context/examples.rs create mode 100644 src/core/context/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 546bcce..78cf720 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,4 +15,6 @@ tracing-subscriber = "0.3" tracing-appender = "0.2" tracing-log = "0.1" unimock = "0.6" -parking_lot = "0.12" \ No newline at end of file +parking_lot = "0.12" +tokio = { version = "1.0", features = ["sync", "time", "macros", "rt", "rt-multi-thread"] } +tokio-util = "0.7" \ No newline at end of file diff --git a/src/core/context/examples.rs b/src/core/context/examples.rs new file mode 100644 index 0000000..f183326 --- /dev/null +++ b/src/core/context/examples.rs @@ -0,0 +1,206 @@ +//! Examples demonstrating CancelableContext usage patterns +//! +//! This module shows how to use the simplified cancelable context for +//! cancellation and irrecoverable error handling. + +use crate::core::context::CancelableContext; +use anyhow::{anyhow, Result}; +use tokio::time::{sleep, Duration}; +use tracing::Span; + +/// Example: Basic cancellation usage +pub async fn basic_cancellation_example(parent_span: &Span) -> Result<()> { + let ctx = CancelableContext::new(parent_span); + let child_ctx = ctx.child(); + + // Spawn background work with child context + // this will start in a separate thread + // it runs for a few seconds unless cancelled + let work_handle = tokio::spawn( // spawn a new asynchronous task + { + // use a clone of the child context + let work_ctx = child_ctx.clone(); + async move { + work_ctx.run(async { + tracing::info!("Starting long-running operation..."); + sleep(Duration::from_secs(5)).await; + tracing::info!("Operation completed"); + Ok(()) + }).await + } + } + ); + + // Cancel after a short time + tokio::spawn({ + let cancel_ctx = ctx.clone(); + async move { + sleep(Duration::from_millis(100)).await; + tracing::info!("Cancelling operation"); + cancel_ctx.cancel(); + } + }); + + // Match against an expected outcome (cancellation) + match work_handle.await { + Ok(Err(_)) => { + tracing::info!("✓ Work was cancelled as expected"); + Ok(()) // SUCCESS: Cancellation worked + } + Ok(Ok(())) => { + tracing::error!("✗ Work completed but should have been cancelled"); + Err(anyhow!("Expected cancellation but work completed successfully")) + } + Err(e) => { + tracing::error!("✗ Work panicked unexpectedly: {}", e); + Err(anyhow!("Work panicked: {}", e)) + } + } + +} + +/// Example: Server startup with irrecoverable error handling +/// All operations are critical - if any fail, the program should terminate +/// In this example, all operations succeed for demonstration purposes +pub async fn startup_example(parent_span: &Span) -> Result<()> { + let ctx = CancelableContext::new(parent_span); + + let startup_operations = vec![ + "initialize_network", + "load_configuration", + "setup_routing", + "register_services", + ]; + + for operation in startup_operations { + // Use run_or_throw for critical startup operations + // If any fails, the program terminates + ctx.run_or_throw(simulate_startup_operation(operation)).await; + tracing::info!("Completed startup operation: {}", operation); + } + + tracing::info!("Server startup completed successfully"); + Ok(()) +} + +/// Example: Hierarchical cancellation +pub async fn hierarchical_cancellation_example(parent_span: &Span) -> Result<()> { + let root_ctx = CancelableContext::new(parent_span); + let service1_ctx = root_ctx.child(); + let service2_ctx = root_ctx.child(); + + // Spawn multiple services + let service1_handle = tokio::spawn({ + let ctx = service1_ctx.clone(); + async move { + ctx.run(async { + tracing::info!("Service 1 starting"); + sleep(Duration::from_secs(2)).await; + tracing::info!("Service 1 completed"); + Ok(()) + }).await + } + }); + + let service2_handle = tokio::spawn({ + let ctx = service2_ctx.clone(); + async move { + ctx.run(async { + tracing::info!("Service 2 starting"); + sleep(Duration::from_secs(2)).await; + tracing::info!("Service 2 completed"); + Ok(()) + }).await + } + }); + + // Cancel root after short time - should cancel all services + tokio::spawn({ + let ctx = root_ctx.clone(); + async move { + sleep(Duration::from_millis(50)).await; + tracing::info!("Shutting down all services"); + ctx.cancel(); + } + }); + + let (result1, result2) = tokio::join!(service1_handle, service2_handle); + + match result1 { + Ok(Ok(())) => tracing::info!("Service 1 completed"), + Ok(Err(e)) => tracing::info!("Service 1 cancelled: {}", e), + Err(e) => tracing::error!("Service 1 panicked: {}", e), + } + + match result2 { + Ok(Ok(())) => tracing::info!("Service 2 completed"), + Ok(Err(e)) => tracing::info!("Service 2 cancelled: {}", e), + Err(e) => tracing::error!("Service 2 panicked: {}", e), + } + + Ok(()) +} + +/// Example: Error propagation pattern +pub async fn error_propagation_example(parent_span: &Span) -> Result<()> { + let root_ctx = CancelableContext::new(parent_span); + let child_ctx = root_ctx.child(); + let _grandchild_ctx = child_ctx.child(); + + // Simulate a critical error in deeply nested operation + let critical_error = anyhow!("Critical database connection failed"); + + // In a real scenario, uncommenting the line below would terminate the program: + // grandchild_ctx.throw_irrecoverable(critical_error); + + // For demo purposes, just log what would happen + tracing::info!("Would propagate error through context hierarchy: {}", critical_error); + tracing::info!("Error would bubble from grandchild -> child -> root -> program exit"); + + Ok(()) +} + +// Helper function to simulate startup operations +// Fails if operation_name is "fail_critical" +// otherwise succeeds after a short delay +async fn simulate_startup_operation(operation_name: &str) -> Result<()> { + // Simulate work + sleep(Duration::from_millis(20)).await; + + // Simulate potential critical failure + if operation_name == "fail_critical" { + return Err(anyhow!("Critical failure in {}", operation_name)); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::testutil::fixtures::span_fixture; + + #[tokio::test] + async fn test_basic_cancellation_example() { + let result = basic_cancellation_example(&span_fixture()).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_startup_example() { + let result = startup_example(&span_fixture()).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_hierarchical_cancellation_example() { + let result = hierarchical_cancellation_example(&span_fixture()).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_error_propagation_example() { + let result = error_propagation_example(&span_fixture()).await; + assert!(result.is_ok()); + } +} \ No newline at end of file diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs new file mode 100644 index 0000000..7a1ad9a --- /dev/null +++ b/src/core/context/mod.rs @@ -0,0 +1,240 @@ +//! Cancelable context with irrecoverable error propagation +//! +//! This module provides a simplified context implementation focused on: +//! - Cancellation support via tokio's CancellationToken +//! - Parent-child context hierarchies +//! - Irrecoverable error propagation that terminates the application +//! +//! Unlike the full Go context API, this implementation focuses only on the core +//! functionality needed: cancellation and error propagation. + +pub mod examples; + +use anyhow::Result; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; +use tracing::Span; + +/// A cancelable context that supports parent-child hierarchies and irrecoverable error propagation. +/// +/// When an irrecoverable error is thrown, it propagates up to the root context and terminates the program. +/// Children automatically get cancelled when their parent is cancelled. +#[derive(Clone)] +pub struct CancelableContext { + inner: Arc, +} + +struct ContextInner { + token: CancellationToken, + parent: Option, + span: Span, +} + +impl CancelableContext { + /// Create a new root context + pub fn new(parent_span: &Span) -> Self { + let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "cancelable_context"); + + Self { + inner: Arc::new(ContextInner { + token: CancellationToken::new(), + parent: None, + span, + }), + } + } + + /// Create a child context that inherits cancellation from the parent + pub fn child(&self) -> Self { + let span = tracing::span!(parent: &self.inner.span, tracing::Level::TRACE, "cancelable_context_child"); + + Self { + inner: Arc::new(ContextInner { + token: self.inner.token.child_token(), + parent: Some(self.clone()), + span, + }), + } + } + + /// triggers Cancel in this context and all its children + /// to check if cancellation is complete, use `cancelled().await` + pub fn cancel(&self) { + let _enter = self.inner.span.enter(); + tracing::trace!("cancelling context"); + self.inner.token.cancel(); + } + + /// Check if the context is cancelled (non-blocking) + pub fn is_cancelled(&self) -> bool { + self.inner.token.is_cancelled() + } + + /// Wait for the context to be cancelled (async) + pub async fn cancelled(&self) { + self.inner.token.cancelled().await; + } + + /// Run an operation with cancellation support + /// If the context is cancelled before the operation completes, it returns an error. + /// otherwise, it returns the operation's result. + pub async fn run(&self, future: F) -> Result + where + F: std::future::Future>, + { + let _enter = self.inner.span.enter(); + + tokio::select! { + result = future => result, + _ = self.cancelled() => { + Err(anyhow::anyhow!("context cancelled")) + } + } + } + + /// Propagate an irrecoverable error up the context chain. + /// When it reaches the root context, it terminates the program. + /// there is no return from this function. + pub fn throw_irrecoverable(&self, err: anyhow::Error) -> ! { + let _enter = self.inner.span.enter(); + + // Propagate to parent if it exists + if let Some(parent) = &self.inner.parent { + tracing::trace!("propagating irrecoverable error to parent context"); + parent.throw_irrecoverable(err); + } + + // Root context - terminate the program + tracing::error!("irrecoverable error: {}", err); + std::process::exit(1); + } + + /// Run an operation, throwing irrecoverable error on failure + /// This is a convenience method that combines `run` and `throw_irrecoverable`. + /// If the operation succeeds, it returns the result. + /// If it fails, it propagates the error irrecoverably, terminating the program. + pub async fn run_or_throw(&self, future: F) -> T + where + F: std::future::Future>, + { + match self.run(future).await { + Ok(value) => value, + Err(err) => self.throw_irrecoverable(err), + } + } +} + +// Custom Debug implementation for better visibility into the context state +impl std::fmt::Debug for CancelableContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CancelableContext") + .field("is_cancelled", &self.is_cancelled()) + .field("has_parent", &self.inner.parent.is_some()) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::testutil::fixtures::span_fixture; + use tokio::time::{sleep, Duration}; + + // this test ensures that cancelling a context works as expected + #[tokio::test] + async fn test_basic_cancellation() { + let ctx = CancelableContext::new(&span_fixture()); + + assert!(!ctx.is_cancelled()); + ctx.cancel(); + assert!(ctx.is_cancelled()); + } + + // this test ensures that cancelling a parent context cancels its children + #[tokio::test] + async fn test_child_cancellation() { + let parent = CancelableContext::new(&span_fixture()); + let child = parent.child(); + + assert!(!child.is_cancelled()); + parent.cancel(); // Cancel parent + + // Small delay for propagation + sleep(Duration::from_millis(1)).await; + assert!(child.is_cancelled()); + } + + // this test ensures that running an operation completes successfully if its context is not canceled + #[tokio::test] + async fn test_successful_operation() { + let ctx = CancelableContext::new(&span_fixture()); + + let result = ctx.run(async { + Ok::(42) + }).await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + } + + // this test ensures that running an operation respects cancellation + // the operation should not complete if the context is canceled + #[tokio::test] + async fn test_run_with_cancellation() { + let ctx = CancelableContext::new(&span_fixture()); + + // Cancel the context + ctx.cancel(); + + // Small delay to ensure cancellation is processed + sleep(Duration::from_millis(1)).await; + + let result = ctx.run(async { + // This should not execute due to cancellation + sleep(Duration::from_millis(10)).await; + Ok::(42) + + }).await; + + // The operation should return an error since the context was canceled before it could complete + assert!(result.is_err()); + // The error should indicate cancellation + assert!(result.unwrap_err().to_string().contains("context cancelled")); + } + + + // this test ensures that nested child contexts are canceled when the root context is canceled + #[tokio::test] + async fn test_nested_children() { + let root = CancelableContext::new(&span_fixture()); + let child1 = root.child(); + let child2 = child1.child(); + let grandchild = child2.child(); + + // Initially, none should be cancelled + assert!(!grandchild.is_cancelled()); + + // Cancel root - should propagate to all children + root.cancel(); + sleep(Duration::from_millis(1)).await; + + // All children contexts should now be cancelled + assert!(child1.is_cancelled()); + assert!(child2.is_cancelled()); + assert!(grandchild.is_cancelled()); + } + + // Test that we can create the error propagation hierarchy + // (We can't test throw_irrecoverable since it exits the program) + #[test] + fn test_error_propagation_structure() { + let root = CancelableContext::new(&span_fixture()); + let child = root.child(); + let grandchild = child.child(); + + // verify the parent chain exists + assert!(grandchild.inner.parent.is_some()); + assert!(child.inner.parent.is_some()); + assert!(root.inner.parent.is_none()); + } +} \ No newline at end of file diff --git a/src/core/mod.rs b/src/core/mod.rs index 15e7d84..a869718 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,8 +1,10 @@ +pub mod context; mod lookup; pub mod model; #[cfg(test)] pub mod testutil; +pub use crate::core::context::CancelableContext; pub use crate::core::lookup::array_lookup_table::ArrayLookupTable; pub use crate::core::lookup::array_lookup_table::LOOKUP_TABLE_LEVELS; pub use crate::core::lookup::LookupTable; diff --git a/src/node/base_node.rs b/src/node/base_node.rs index 4139f9d..b2b8bdb 100644 --- a/src/node/base_node.rs +++ b/src/node/base_node.rs @@ -1,6 +1,6 @@ use crate::core::model::direction::Direction; use crate::core::{ - Identifier, IdSearchReq, IdSearchRes, LookupTable, MembershipVector, + CancelableContext, Identifier, IdSearchReq, IdSearchRes, LookupTable, MembershipVector, }; #[cfg(test)] // TODO: Remove once BaseNode is used in production code. use crate::network::MessageProcessor; @@ -21,6 +21,7 @@ pub(crate) struct BaseNode { lt: Box, net: Box, span: Span, + ctx: CancelableContext, } impl Node for BaseNode { @@ -217,6 +218,11 @@ impl EventProcessorCore for BaseNode { } impl BaseNode { + /// Get a reference to the cancelable context for this node. + pub fn context(&self) -> &CancelableContext { + &self.ctx + } + /// Create a new `BaseNode` with the provided identifier, membership vector /// and lookup table. #[cfg(test)] // TODO: Remove once BaseNode is used in production code. @@ -231,6 +237,9 @@ impl BaseNode { let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "base_node"); let _enter = span.enter(); + // Create a cancelable context for this node + let ctx = CancelableContext::new(&span); + tracing::trace!( "creating BaseNode with id {:?}, mem_vec {:?}", id, @@ -243,6 +252,7 @@ impl BaseNode { lt, net, span: span.clone(), + ctx, }; // Create a MessageProcessor from this node, instead of casting directly @@ -253,9 +263,11 @@ impl BaseNode { id ); - clone_net - .register_processor(processor) - .map_err(|e| anyhow!("could not register node in network: {}", e))?; + // Use throw_irrecoverable for critical startup failures + if let Err(e) = clone_net.register_processor(processor) { + let error = anyhow!("could not register node in network: {}", e); + node.ctx.throw_irrecoverable(error); + } tracing::trace!( "successfully created and registered BaseNode {:?}", @@ -272,7 +284,7 @@ impl BaseNode { impl PartialEq for BaseNode { fn eq(&self, other: &Self) -> bool { self.id == other.id && self.mem_vec == other.mem_vec - // ignore lt for equality check as comparing trait objects is non-trivial + // ignore lt and ctx for equality check as comparing trait objects is non-trivial } } @@ -293,6 +305,7 @@ impl Clone for BaseNode { lt: self.lt.clone(), net: self.net.clone(), span: self.span.clone(), + ctx: self.ctx.clone(), } } } @@ -310,12 +323,14 @@ mod tests { fn test_base_node() { let id = random_identifier(); let mem_vec = random_membership_vector(); + let span = span_fixture(); let node = BaseNode { id, mem_vec, - lt: Box::new(ArrayLookupTable::new(&span_fixture())), + lt: Box::new(ArrayLookupTable::new(&span)), net: Box::new(Unimock::new(())), // No expectations needed for direct struct construction - span: span_fixture(), + span: span.clone(), + ctx: CancelableContext::new(&span), }; assert_eq!(node.get_identifier(), &id); assert_eq!(node.get_membership_vector(), &mem_vec); From d39ac3cb0614b3c55a5953e6dc96b47ee1f56a6c Mon Sep 17 00:00:00 2001 From: sanaz Date: Fri, 12 Sep 2025 12:34:50 -0700 Subject: [PATCH 02/17] marks context method as unused to resolve linter complaint --- src/node/base_node.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/node/base_node.rs b/src/node/base_node.rs index b2b8bdb..fde6118 100644 --- a/src/node/base_node.rs +++ b/src/node/base_node.rs @@ -219,6 +219,7 @@ impl EventProcessorCore for BaseNode { impl BaseNode { /// Get a reference to the cancelable context for this node. + #[allow(dead_code)] // for now, but should remove it in the future pub fn context(&self) -> &CancelableContext { &self.ctx } From 31caa516295481b8bdfbb3cf23dd89824bde0b58 Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Sep 2025 09:53:31 -0700 Subject: [PATCH 03/17] renames CancelableContext to IrrevocableContext --- src/core/context/examples.rs | 12 ++++++------ src/core/context/mod.rs | 26 +++++++++++++------------- src/core/mod.rs | 2 +- src/node/base_node.rs | 10 +++++----- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/core/context/examples.rs b/src/core/context/examples.rs index f183326..7828b9b 100644 --- a/src/core/context/examples.rs +++ b/src/core/context/examples.rs @@ -1,16 +1,16 @@ -//! Examples demonstrating CancelableContext usage patterns +//! Examples demonstrating IrrevocableContext usage patterns //! //! This module shows how to use the simplified cancelable context for //! cancellation and irrecoverable error handling. -use crate::core::context::CancelableContext; +use crate::core::context::IrrevocableContext; use anyhow::{anyhow, Result}; use tokio::time::{sleep, Duration}; use tracing::Span; /// Example: Basic cancellation usage pub async fn basic_cancellation_example(parent_span: &Span) -> Result<()> { - let ctx = CancelableContext::new(parent_span); + let ctx = IrrevocableContext::new(parent_span); let child_ctx = ctx.child(); // Spawn background work with child context @@ -63,7 +63,7 @@ pub async fn basic_cancellation_example(parent_span: &Span) -> Result<()> { /// All operations are critical - if any fail, the program should terminate /// In this example, all operations succeed for demonstration purposes pub async fn startup_example(parent_span: &Span) -> Result<()> { - let ctx = CancelableContext::new(parent_span); + let ctx = IrrevocableContext::new(parent_span); let startup_operations = vec![ "initialize_network", @@ -85,7 +85,7 @@ pub async fn startup_example(parent_span: &Span) -> Result<()> { /// Example: Hierarchical cancellation pub async fn hierarchical_cancellation_example(parent_span: &Span) -> Result<()> { - let root_ctx = CancelableContext::new(parent_span); + let root_ctx = IrrevocableContext::new(parent_span); let service1_ctx = root_ctx.child(); let service2_ctx = root_ctx.child(); @@ -143,7 +143,7 @@ pub async fn hierarchical_cancellation_example(parent_span: &Span) -> Result<()> /// Example: Error propagation pattern pub async fn error_propagation_example(parent_span: &Span) -> Result<()> { - let root_ctx = CancelableContext::new(parent_span); + let root_ctx = IrrevocableContext::new(parent_span); let child_ctx = root_ctx.child(); let _grandchild_ctx = child_ctx.child(); diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index 7a1ad9a..1b80c7b 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -20,20 +20,20 @@ use tracing::Span; /// When an irrecoverable error is thrown, it propagates up to the root context and terminates the program. /// Children automatically get cancelled when their parent is cancelled. #[derive(Clone)] -pub struct CancelableContext { +pub struct IrrevocableContext { inner: Arc, } struct ContextInner { token: CancellationToken, - parent: Option, + parent: Option, span: Span, } -impl CancelableContext { +impl IrrevocableContext { /// Create a new root context pub fn new(parent_span: &Span) -> Self { - let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "cancelable_context"); + let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "irrevocable_context"); Self { inner: Arc::new(ContextInner { @@ -46,7 +46,7 @@ impl CancelableContext { /// Create a child context that inherits cancellation from the parent pub fn child(&self) -> Self { - let span = tracing::span!(parent: &self.inner.span, tracing::Level::TRACE, "cancelable_context_child"); + let span = tracing::span!(parent: &self.inner.span, tracing::Level::TRACE, "irrevocable_context_child"); Self { inner: Arc::new(ContextInner { @@ -125,9 +125,9 @@ impl CancelableContext { } // Custom Debug implementation for better visibility into the context state -impl std::fmt::Debug for CancelableContext { +impl std::fmt::Debug for IrrevocableContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("CancelableContext") + f.debug_struct("IrrevocableContext") .field("is_cancelled", &self.is_cancelled()) .field("has_parent", &self.inner.parent.is_some()) .finish() @@ -143,7 +143,7 @@ mod tests { // this test ensures that cancelling a context works as expected #[tokio::test] async fn test_basic_cancellation() { - let ctx = CancelableContext::new(&span_fixture()); + let ctx = IrrevocableContext::new(&span_fixture()); assert!(!ctx.is_cancelled()); ctx.cancel(); @@ -153,7 +153,7 @@ mod tests { // this test ensures that cancelling a parent context cancels its children #[tokio::test] async fn test_child_cancellation() { - let parent = CancelableContext::new(&span_fixture()); + let parent = IrrevocableContext::new(&span_fixture()); let child = parent.child(); assert!(!child.is_cancelled()); @@ -167,7 +167,7 @@ mod tests { // this test ensures that running an operation completes successfully if its context is not canceled #[tokio::test] async fn test_successful_operation() { - let ctx = CancelableContext::new(&span_fixture()); + let ctx = IrrevocableContext::new(&span_fixture()); let result = ctx.run(async { Ok::(42) @@ -181,7 +181,7 @@ mod tests { // the operation should not complete if the context is canceled #[tokio::test] async fn test_run_with_cancellation() { - let ctx = CancelableContext::new(&span_fixture()); + let ctx = IrrevocableContext::new(&span_fixture()); // Cancel the context ctx.cancel(); @@ -206,7 +206,7 @@ mod tests { // this test ensures that nested child contexts are canceled when the root context is canceled #[tokio::test] async fn test_nested_children() { - let root = CancelableContext::new(&span_fixture()); + let root = IrrevocableContext::new(&span_fixture()); let child1 = root.child(); let child2 = child1.child(); let grandchild = child2.child(); @@ -228,7 +228,7 @@ mod tests { // (We can't test throw_irrecoverable since it exits the program) #[test] fn test_error_propagation_structure() { - let root = CancelableContext::new(&span_fixture()); + let root = IrrevocableContext::new(&span_fixture()); let child = root.child(); let grandchild = child.child(); diff --git a/src/core/mod.rs b/src/core/mod.rs index a869718..323c974 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -4,7 +4,7 @@ pub mod model; #[cfg(test)] pub mod testutil; -pub use crate::core::context::CancelableContext; +pub use crate::core::context::IrrevocableContext; pub use crate::core::lookup::array_lookup_table::ArrayLookupTable; pub use crate::core::lookup::array_lookup_table::LOOKUP_TABLE_LEVELS; pub use crate::core::lookup::LookupTable; diff --git a/src/node/base_node.rs b/src/node/base_node.rs index fde6118..899c365 100644 --- a/src/node/base_node.rs +++ b/src/node/base_node.rs @@ -1,6 +1,6 @@ use crate::core::model::direction::Direction; use crate::core::{ - CancelableContext, Identifier, IdSearchReq, IdSearchRes, LookupTable, MembershipVector, + IrrevocableContext, Identifier, IdSearchReq, IdSearchRes, LookupTable, MembershipVector, }; #[cfg(test)] // TODO: Remove once BaseNode is used in production code. use crate::network::MessageProcessor; @@ -21,7 +21,7 @@ pub(crate) struct BaseNode { lt: Box, net: Box, span: Span, - ctx: CancelableContext, + ctx: IrrevocableContext, } impl Node for BaseNode { @@ -220,7 +220,7 @@ impl EventProcessorCore for BaseNode { impl BaseNode { /// Get a reference to the cancelable context for this node. #[allow(dead_code)] // for now, but should remove it in the future - pub fn context(&self) -> &CancelableContext { + pub fn context(&self) -> &IrrevocableContext { &self.ctx } @@ -239,7 +239,7 @@ impl BaseNode { let _enter = span.enter(); // Create a cancelable context for this node - let ctx = CancelableContext::new(&span); + let ctx = IrrevocableContext::new(&span); tracing::trace!( "creating BaseNode with id {:?}, mem_vec {:?}", @@ -331,7 +331,7 @@ mod tests { lt: Box::new(ArrayLookupTable::new(&span)), net: Box::new(Unimock::new(())), // No expectations needed for direct struct construction span: span.clone(), - ctx: CancelableContext::new(&span), + ctx: IrrevocableContext::new(&span), }; assert_eq!(node.get_identifier(), &id); assert_eq!(node.get_membership_vector(), &mem_vec); From 70a19c9a65e251fcfce4ab72e39c4bdd4e673b06 Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Sep 2025 10:05:37 -0700 Subject: [PATCH 04/17] adds tag argument to child and new methods --- src/core/context/examples.rs | 18 +++++++++--------- src/core/context/mod.rs | 33 ++++++++++++++++----------------- src/node/base_node.rs | 4 ++-- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/src/core/context/examples.rs b/src/core/context/examples.rs index 7828b9b..2eee0c9 100644 --- a/src/core/context/examples.rs +++ b/src/core/context/examples.rs @@ -10,8 +10,8 @@ use tracing::Span; /// Example: Basic cancellation usage pub async fn basic_cancellation_example(parent_span: &Span) -> Result<()> { - let ctx = IrrevocableContext::new(parent_span); - let child_ctx = ctx.child(); + let ctx = IrrevocableContext::new(parent_span, "basic_cancellation_example"); + let child_ctx = ctx.child("worker_task"); // Spawn background work with child context // this will start in a separate thread @@ -63,7 +63,7 @@ pub async fn basic_cancellation_example(parent_span: &Span) -> Result<()> { /// All operations are critical - if any fail, the program should terminate /// In this example, all operations succeed for demonstration purposes pub async fn startup_example(parent_span: &Span) -> Result<()> { - let ctx = IrrevocableContext::new(parent_span); + let ctx = IrrevocableContext::new(parent_span, "startup_example"); let startup_operations = vec![ "initialize_network", @@ -85,9 +85,9 @@ pub async fn startup_example(parent_span: &Span) -> Result<()> { /// Example: Hierarchical cancellation pub async fn hierarchical_cancellation_example(parent_span: &Span) -> Result<()> { - let root_ctx = IrrevocableContext::new(parent_span); - let service1_ctx = root_ctx.child(); - let service2_ctx = root_ctx.child(); + let root_ctx = IrrevocableContext::new(parent_span, "hierarchical_cancellation_example"); + let service1_ctx = root_ctx.child("service1"); + let service2_ctx = root_ctx.child("service2"); // Spawn multiple services let service1_handle = tokio::spawn({ @@ -143,9 +143,9 @@ pub async fn hierarchical_cancellation_example(parent_span: &Span) -> Result<()> /// Example: Error propagation pattern pub async fn error_propagation_example(parent_span: &Span) -> Result<()> { - let root_ctx = IrrevocableContext::new(parent_span); - let child_ctx = root_ctx.child(); - let _grandchild_ctx = child_ctx.child(); + let root_ctx = IrrevocableContext::new(parent_span, "error_propagation_example"); + let child_ctx = root_ctx.child("child_context"); + let _grandchild_ctx = child_ctx.child("grandchild_context"); // Simulate a critical error in deeply nested operation let critical_error = anyhow!("Critical database connection failed"); diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index 1b80c7b..3b261a9 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -32,8 +32,8 @@ struct ContextInner { impl IrrevocableContext { /// Create a new root context - pub fn new(parent_span: &Span) -> Self { - let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "irrevocable_context"); + pub fn new(parent_span: &Span, tag: &str) -> Self { + let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "irrevocable_context", tag = tag); Self { inner: Arc::new(ContextInner { @@ -45,9 +45,8 @@ impl IrrevocableContext { } /// Create a child context that inherits cancellation from the parent - pub fn child(&self) -> Self { - let span = tracing::span!(parent: &self.inner.span, tracing::Level::TRACE, "irrevocable_context_child"); - + pub fn child(&self, tag: &str) -> Self { + let span = tracing::span!(parent: &self.inner.span, tracing::Level::TRACE, "irrevocable_context_child", tag = tag); Self { inner: Arc::new(ContextInner { token: self.inner.token.child_token(), @@ -143,7 +142,7 @@ mod tests { // this test ensures that cancelling a context works as expected #[tokio::test] async fn test_basic_cancellation() { - let ctx = IrrevocableContext::new(&span_fixture()); + let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); assert!(!ctx.is_cancelled()); ctx.cancel(); @@ -153,8 +152,8 @@ mod tests { // this test ensures that cancelling a parent context cancels its children #[tokio::test] async fn test_child_cancellation() { - let parent = IrrevocableContext::new(&span_fixture()); - let child = parent.child(); + let parent = IrrevocableContext::new(&span_fixture(), "test_context"); + let child = parent.child("test_child"); assert!(!child.is_cancelled()); parent.cancel(); // Cancel parent @@ -167,7 +166,7 @@ mod tests { // this test ensures that running an operation completes successfully if its context is not canceled #[tokio::test] async fn test_successful_operation() { - let ctx = IrrevocableContext::new(&span_fixture()); + let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); let result = ctx.run(async { Ok::(42) @@ -181,7 +180,7 @@ mod tests { // the operation should not complete if the context is canceled #[tokio::test] async fn test_run_with_cancellation() { - let ctx = IrrevocableContext::new(&span_fixture()); + let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); // Cancel the context ctx.cancel(); @@ -206,10 +205,10 @@ mod tests { // this test ensures that nested child contexts are canceled when the root context is canceled #[tokio::test] async fn test_nested_children() { - let root = IrrevocableContext::new(&span_fixture()); - let child1 = root.child(); - let child2 = child1.child(); - let grandchild = child2.child(); + let root = IrrevocableContext::new(&span_fixture(), "test_nested_children_root"); + let child1 = root.child("child1"); + let child2 = child1.child("child2"); + let grandchild = child2.child("grandchild"); // Initially, none should be cancelled assert!(!grandchild.is_cancelled()); @@ -228,9 +227,9 @@ mod tests { // (We can't test throw_irrecoverable since it exits the program) #[test] fn test_error_propagation_structure() { - let root = IrrevocableContext::new(&span_fixture()); - let child = root.child(); - let grandchild = child.child(); + let root = IrrevocableContext::new(&span_fixture(), "test_error_propagation_root"); + let child = root.child("error_prop_child"); + let grandchild = child.child("error_prop_grandchild"); // verify the parent chain exists assert!(grandchild.inner.parent.is_some()); diff --git a/src/node/base_node.rs b/src/node/base_node.rs index 899c365..89759e7 100644 --- a/src/node/base_node.rs +++ b/src/node/base_node.rs @@ -239,7 +239,7 @@ impl BaseNode { let _enter = span.enter(); // Create a cancelable context for this node - let ctx = IrrevocableContext::new(&span); + let ctx = IrrevocableContext::new(&span, "base_node_context"); tracing::trace!( "creating BaseNode with id {:?}, mem_vec {:?}", @@ -331,7 +331,7 @@ mod tests { lt: Box::new(ArrayLookupTable::new(&span)), net: Box::new(Unimock::new(())), // No expectations needed for direct struct construction span: span.clone(), - ctx: IrrevocableContext::new(&span), + ctx: IrrevocableContext::new(&span, "base_node_test_context"), }; assert_eq!(node.get_identifier(), &id); assert_eq!(node.get_membership_vector(), &mem_vec); From dbcd5629dcfc04abccf8514445daa3d63fbcc624 Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Sep 2025 10:06:17 -0700 Subject: [PATCH 05/17] deletes is_cancelled --- src/core/context/mod.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index 3b261a9..8f9a0cf 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -64,11 +64,6 @@ impl IrrevocableContext { self.inner.token.cancel(); } - /// Check if the context is cancelled (non-blocking) - pub fn is_cancelled(&self) -> bool { - self.inner.token.is_cancelled() - } - /// Wait for the context to be cancelled (async) pub async fn cancelled(&self) { self.inner.token.cancelled().await; From 158b48c798e397ef633615326bdf76f7eec11e22 Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Sep 2025 10:07:21 -0700 Subject: [PATCH 06/17] log level has been updated to error --- src/core/context/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index 8f9a0cf..842ebda 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -94,7 +94,7 @@ impl IrrevocableContext { // Propagate to parent if it exists if let Some(parent) = &self.inner.parent { - tracing::trace!("propagating irrecoverable error to parent context"); + tracing::error!("propagating irrecoverable error to parent context"); parent.throw_irrecoverable(err); } From 9a03f62192b6f64f47a29b17cde4f8e91181a064 Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Sep 2025 10:11:45 -0700 Subject: [PATCH 07/17] panics instead of exit --- src/core/context/mod.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index 842ebda..a9ee096 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -64,6 +64,11 @@ impl IrrevocableContext { self.inner.token.cancel(); } + /// Check if the context is cancelled (non-blocking) + pub fn is_cancelled(&self) -> bool { + self.inner.token.is_cancelled() + } + /// Wait for the context to be cancelled (async) pub async fn cancelled(&self) { self.inner.token.cancelled().await; @@ -98,9 +103,8 @@ impl IrrevocableContext { parent.throw_irrecoverable(err); } - // Root context - terminate the program - tracing::error!("irrecoverable error: {}", err); - std::process::exit(1); + // Root context - panic with the error + panic!("irrecoverable error: {}", err); } /// Run an operation, throwing irrecoverable error on failure From 091e29e96b6e625646920467a05018f30df811d7 Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Sep 2025 10:17:58 -0700 Subject: [PATCH 08/17] marks is_cancelled as private --- src/core/context/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index a9ee096..0a62a3a 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -64,8 +64,8 @@ impl IrrevocableContext { self.inner.token.cancel(); } - /// Check if the context is cancelled (non-blocking) - pub fn is_cancelled(&self) -> bool { + /// Check if the context is cancelled (non-blocking, private) + fn is_cancelled(&self) -> bool { self.inner.token.is_cancelled() } From a0608c07e3d95e192db72e29b32974ba5a9e079d Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Sep 2025 10:23:31 -0700 Subject: [PATCH 09/17] implements shallow clone, and updated claude guideline --- CLAUDE.md | 49 +++++++++++++++++++++++++++++++++++++++++ src/core/context/mod.rs | 15 ++++++++++++- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index 276856f..c06094c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -110,6 +110,55 @@ impl Clone for Box { **Why Not Copy**: Copy requires all fields to be Copy, doesn't work with trait objects, and semantically implies independent data rather than shared ownership. +### Explicit Clone Implementation for Future-Proof Shallow Cloning + +**Principle**: For logic-handling structures that use shallow cloning, implement `Clone` explicitly rather than deriving it. This prevents accidental deep cloning if future changes introduce non-shallow-clonable fields. + +**Implementation Pattern**: +```rust +// GOOD: Explicit Clone implementation +pub struct LogicStruct { + inner: Arc>, + // If future changes add fields that shouldn't be deep cloned, + // this implementation will need explicit updating +} + +// Remove #[derive(Clone)] and implement manually +impl Clone for LogicStruct { + fn clone(&self) -> Self { + // Shallow clone: cloned instances share the same underlying data via Arc + // This comment makes the intent explicit and serves as a reminder + // for future maintainers + LogicStruct { + inner: Arc::clone(&self.inner), + // Any new fields added here must maintain shallow cloning semantics + } + } +} +``` + +**Avoid Pattern**: +```rust +// AVOID: Derived Clone that may become deep cloning +#[derive(Clone)] // Dangerous if future fields are added +pub struct LogicStruct { + inner: Arc>, +} +``` + +**Benefits**: +- **Future-proof**: Adding new fields requires explicit decision about cloning behavior +- **Intent clarity**: Makes shallow cloning semantics explicit in code +- **Compile-time safety**: Non-clonable fields will cause compilation errors, forcing deliberate handling +- **Documentation**: Comments explain the shallow cloning contract + +**Reference Implementation**: See `IrrevocableContext` in `src/core/context/mod.rs` + +**When to Apply**: +- All logic-handling structures using Arc-based shared state +- Structures where shallow cloning semantics are critical to correctness +- Types that may evolve to include complex state in the future + ### Internal Thread Safety Pattern **Principle**: Prefer internal thread safety over external mutual exclusion. Structures should be internally thread-safe using Arc> patterns rather than requiring external Arc> wrapping. diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index 0a62a3a..429ea0b 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -19,7 +19,6 @@ use tracing::Span; /// /// When an irrecoverable error is thrown, it propagates up to the root context and terminates the program. /// Children automatically get cancelled when their parent is cancelled. -#[derive(Clone)] pub struct IrrevocableContext { inner: Arc, } @@ -132,6 +131,20 @@ impl std::fmt::Debug for IrrevocableContext { } } +/// Custom Clone implementation to ensure shallow cloning behavior. +/// This implementation explicitly controls cloning to ensure that: +/// - Only Arc pointer is cloned (shallow), not the underlying data +/// - If future changes add non-shallow-clonable fields, this implementation +/// must be updated to maintain the shallow cloning semantics +impl Clone for IrrevocableContext { + fn clone(&self) -> Self { + // Shallow clone: cloned instances share the same underlying data via Arc + IrrevocableContext { + inner: Arc::clone(&self.inner), + } + } +} + #[cfg(test)] mod tests { use super::*; From 5bdd4910e6cd0ae75b1b9e9b91874c74852c563d Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 16 Sep 2025 10:28:30 -0700 Subject: [PATCH 10/17] ensure rust docs start with /// --- src/core/context/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index 429ea0b..b85d68c 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -151,7 +151,7 @@ mod tests { use crate::core::testutil::fixtures::span_fixture; use tokio::time::{sleep, Duration}; - // this test ensures that cancelling a context works as expected + /// this test ensures that cancelling a context works as expected #[tokio::test] async fn test_basic_cancellation() { let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); @@ -161,7 +161,7 @@ mod tests { assert!(ctx.is_cancelled()); } - // this test ensures that cancelling a parent context cancels its children + /// this test ensures that cancelling a parent context cancels its children #[tokio::test] async fn test_child_cancellation() { let parent = IrrevocableContext::new(&span_fixture(), "test_context"); @@ -175,7 +175,7 @@ mod tests { assert!(child.is_cancelled()); } - // this test ensures that running an operation completes successfully if its context is not canceled + /// this test ensures that running an operation completes successfully if its context is not canceled #[tokio::test] async fn test_successful_operation() { let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); @@ -188,8 +188,8 @@ mod tests { assert_eq!(result.unwrap(), 42); } - // this test ensures that running an operation respects cancellation - // the operation should not complete if the context is canceled + /// this test ensures that running an operation respects cancellation + /// the operation should not complete if the context is canceled #[tokio::test] async fn test_run_with_cancellation() { let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); @@ -214,7 +214,7 @@ mod tests { } - // this test ensures that nested child contexts are canceled when the root context is canceled + /// this test ensures that nested child contexts are canceled when the root context is canceled #[tokio::test] async fn test_nested_children() { let root = IrrevocableContext::new(&span_fixture(), "test_nested_children_root"); @@ -235,8 +235,8 @@ mod tests { assert!(grandchild.is_cancelled()); } - // Test that we can create the error propagation hierarchy - // (We can't test throw_irrecoverable since it exits the program) + /// Test that we can create the error propagation hierarchy + /// (We can't test throw_irrecoverable since it exits the program) #[test] fn test_error_propagation_structure() { let root = IrrevocableContext::new(&span_fixture(), "test_error_propagation_root"); From 4c40d88c3d29afa198d67d214c60267c6ca48b34 Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Sep 2025 13:35:50 -0700 Subject: [PATCH 11/17] implements wait_until --- src/core/testutil/fixtures.rs | 60 +++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/core/testutil/fixtures.rs b/src/core/testutil/fixtures.rs index 487315d..f32b769 100644 --- a/src/core/testutil/fixtures.rs +++ b/src/core/testutil/fixtures.rs @@ -538,3 +538,63 @@ mod test { ); } } + +/// Waits until a condition becomes true within a specified timeout period. +/// This utility is designed for testing scenarios where you need to verify that +/// some asynchronous behavior occurs within a reasonable time frame. +/// +/// Uses a channel-based approach with cooperative +/// scheduling via `yield_now()` to be CPU-friendly while maintaining responsiveness. +/// +/// # Arguments +/// * `condition` - A closure that returns true when the expected condition is met +/// * `timeout` - Maximum time to wait for the condition to become true +/// +/// # Returns +/// * `Ok(())` if the condition becomes true within the timeout +/// * `Err(String)` if the timeout is exceeded before the condition is met +/// +/// # Example +/// ```ignore +/// // Wait for a context to be cancelled within 100ms +/// wait_until( +/// || context.is_cancelled(), +/// Duration::from_millis(100) +/// ).await.expect("context should be cancelled within 100ms"); +/// ``` +pub async fn wait_until( + mut condition: F, + timeout: Duration, +) -> Result<(), String> +where + F: FnMut() -> bool + Send + 'static, +{ + let (tx, rx) = tokio::sync::oneshot::channel::>(); + + // Spawn a single task that polls the condition and sends the result + let condition_task = tokio::task::spawn_blocking(move || { + loop { + if condition() { + // Condition met - send success and return + if tx.send(Ok(())).is_err() { + // Receiver dropped, but we're done anyway + } + return; + } + // Cooperative scheduling - yield to other threads + std::thread::yield_now(); + } + }); + + // Wait for the result with timeout + let result = match tokio::time::timeout(timeout, rx).await { + Ok(Ok(result)) => result, + Ok(Err(_)) => Err("channel closed unexpectedly".to_string()), + Err(_) => Err(format!("condition not met within timeout of {:?}", timeout)), + }; + + // Clean up the task if it's still running + condition_task.abort(); + + result +} From 8e297ede81e0b39dcd242e69e8abe0a874fb2bb4 Mon Sep 17 00:00:00 2001 From: sanaz Date: Wed, 17 Sep 2025 13:36:02 -0700 Subject: [PATCH 12/17] replaces sleeps with wait_until --- src/core/context/mod.rs | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index b85d68c..c3ecea7 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -148,7 +148,7 @@ impl Clone for IrrevocableContext { #[cfg(test)] mod tests { use super::*; - use crate::core::testutil::fixtures::span_fixture; + use crate::core::testutil::fixtures::{span_fixture, wait_until}; use tokio::time::{sleep, Duration}; /// this test ensures that cancelling a context works as expected @@ -169,10 +169,13 @@ mod tests { assert!(!child.is_cancelled()); parent.cancel(); // Cancel parent - - // Small delay for propagation - sleep(Duration::from_millis(1)).await; - assert!(child.is_cancelled()); + + // Wait for cancellation to propagate + let child_clone = child.clone(); + wait_until( + move || child_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("child context should be cancelled within 100ms"); } /// this test ensures that running an operation completes successfully if its context is not canceled @@ -196,9 +199,13 @@ mod tests { // Cancel the context ctx.cancel(); - - // Small delay to ensure cancellation is processed - sleep(Duration::from_millis(1)).await; + + // Wait for cancellation to be processed + let ctx_clone = ctx.clone(); + wait_until( + move || ctx_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("context should be cancelled within 100ms"); let result = ctx.run(async { // This should not execute due to cancellation @@ -227,12 +234,15 @@ mod tests { // Cancel root - should propagate to all children root.cancel(); - sleep(Duration::from_millis(1)).await; - // All children contexts should now be cancelled - assert!(child1.is_cancelled()); - assert!(child2.is_cancelled()); - assert!(grandchild.is_cancelled()); + // Wait for cancellation to propagate to all children + let child1_clone = child1.clone(); + let child2_clone = child2.clone(); + let grandchild_clone = grandchild.clone(); + wait_until( + move || child1_clone.is_cancelled() && child2_clone.is_cancelled() && grandchild_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("all child contexts should be cancelled within 100ms"); } /// Test that we can create the error propagation hierarchy From a646ff4de1c1ccd4ddbfc11fb2c08c387ca64e79 Mon Sep 17 00:00:00 2001 From: sanaz Date: Thu, 18 Sep 2025 10:32:05 -0700 Subject: [PATCH 13/17] refactors tests to use wait_until --- src/core/context/mod.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index c3ecea7..4bcd108 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -155,10 +155,16 @@ mod tests { #[tokio::test] async fn test_basic_cancellation() { let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); - + assert!(!ctx.is_cancelled()); ctx.cancel(); - assert!(ctx.is_cancelled()); + + // Wait until cancellation is processed + let ctx_clone = ctx.clone(); + wait_until( + move || ctx_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("context should be cancelled within 100ms"); } /// this test ensures that cancelling a parent context cancels its children @@ -252,6 +258,8 @@ mod tests { let root = IrrevocableContext::new(&span_fixture(), "test_error_propagation_root"); let child = root.child("error_prop_child"); let grandchild = child.child("error_prop_grandchild"); + + grandchild.cancel(); // verify the parent chain exists assert!(grandchild.inner.parent.is_some()); From 01e3de0799261be23a1a69119e21f7d1d2947609 Mon Sep 17 00:00:00 2001 From: sanaz Date: Thu, 18 Sep 2025 11:10:26 -0700 Subject: [PATCH 14/17] refactors test to catch irrecoverable error thrown from throw_irrecoverable --- src/core/context/mod.rs | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index 4bcd108..a694229 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -252,7 +252,6 @@ mod tests { } /// Test that we can create the error propagation hierarchy - /// (We can't test throw_irrecoverable since it exits the program) #[test] fn test_error_propagation_structure() { let root = IrrevocableContext::new(&span_fixture(), "test_error_propagation_root"); @@ -260,10 +259,34 @@ mod tests { let grandchild = child.child("error_prop_grandchild"); grandchild.cancel(); - + // verify the parent chain exists assert!(grandchild.inner.parent.is_some()); assert!(child.inner.parent.is_some()); assert!(root.inner.parent.is_none()); } + + /// Test that throw_irrecoverable properly panics when called + #[test] + fn test_throw_irrecoverable_panics() { + let result = std::panic::catch_unwind(|| { + let root = IrrevocableContext::new(&span_fixture(), "test_throw_root"); + let child = root.child("test_throw_child"); + + // This should panic with the irrecoverable error message + child.throw_irrecoverable(anyhow::anyhow!("test irrecoverable error")); + }); + + // Verify that a panic occurred + assert!(result.is_err()); + + // Verify the panic message contains our error + if let Err(panic_payload) = result { + if let Some(panic_msg) = panic_payload.downcast_ref::() { + assert_eq!(panic_msg, "irrecoverable error: test irrecoverable error"); + } else{ + panic!("unexpected panic payload type"); + } + } + } } \ No newline at end of file From 39b5684009d71c627895f7951216b4f60396b6ba Mon Sep 17 00:00:00 2001 From: sanaz Date: Thu, 18 Sep 2025 11:21:46 -0700 Subject: [PATCH 15/17] splits test from the code --- src/core/context/context_test.rs | 145 ++++++++++++++++++++++++++++++ src/core/context/mod.rs | 148 +------------------------------ 2 files changed, 148 insertions(+), 145 deletions(-) create mode 100644 src/core/context/context_test.rs diff --git a/src/core/context/context_test.rs b/src/core/context/context_test.rs new file mode 100644 index 0000000..b91a55c --- /dev/null +++ b/src/core/context/context_test.rs @@ -0,0 +1,145 @@ +#[cfg(test)] +mod tests { + use crate::core::context::IrrevocableContext; + use crate::core::testutil::fixtures::{span_fixture, wait_until}; + use tokio::time::{sleep, Duration}; + + /// this test ensures that cancelling a context works as expected + #[tokio::test] + async fn test_basic_cancellation() { + let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); + + assert!(!ctx.is_cancelled()); + ctx.cancel(); + + // Wait until cancellation is processed + let ctx_clone = ctx.clone(); + wait_until( + move || ctx_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("context should be cancelled within 100ms"); + } + + /// this test ensures that cancelling a parent context cancels its children + #[tokio::test] + async fn test_child_cancellation() { + let parent = IrrevocableContext::new(&span_fixture(), "test_context"); + let child = parent.child("test_child"); + + assert!(!child.is_cancelled()); + parent.cancel(); // Cancel parent + + // Wait for cancellation to propagate + let child_clone = child.clone(); + wait_until( + move || child_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("child context should be cancelled within 100ms"); + } + + /// this test ensures that running an operation completes successfully if its context is not canceled + #[tokio::test] + async fn test_successful_operation() { + let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); + + let result = ctx.run(async { + Ok::(42) + }).await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + } + + /// this test ensures that running an operation respects cancellation + /// the operation should not complete if the context is canceled + #[tokio::test] + async fn test_run_with_cancellation() { + let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); + + // Cancel the context + ctx.cancel(); + + // Wait for cancellation to be processed + let ctx_clone = ctx.clone(); + wait_until( + move || ctx_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("context should be cancelled within 100ms"); + + let result = ctx.run(async { + // This should not execute due to cancellation + sleep(Duration::from_millis(10)).await; + Ok::(42) + + }).await; + + // The operation should return an error since the context was canceled before it could complete + assert!(result.is_err()); + // The error should indicate cancellation + assert!(result.unwrap_err().to_string().contains("context cancelled")); + } + + + /// this test ensures that nested child contexts are canceled when the root context is canceled + #[tokio::test] + async fn test_nested_children() { + let root = IrrevocableContext::new(&span_fixture(), "test_nested_children_root"); + let child1 = root.child("child1"); + let child2 = child1.child("child2"); + let grandchild = child2.child("grandchild"); + + // Initially, none should be cancelled + assert!(!grandchild.is_cancelled()); + + // Cancel root - should propagate to all children + root.cancel(); + + // Wait for cancellation to propagate to all children + let child1_clone = child1.clone(); + let child2_clone = child2.clone(); + let grandchild_clone = grandchild.clone(); + wait_until( + move || child1_clone.is_cancelled() && child2_clone.is_cancelled() && grandchild_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("all child contexts should be cancelled within 100ms"); + } + + /// Test that we can create the error propagation hierarchy + #[test] + fn test_error_propagation_structure() { + let root = IrrevocableContext::new(&span_fixture(), "test_error_propagation_root"); + let child = root.child("error_prop_child"); + let grandchild = child.child("error_prop_grandchild"); + + grandchild.cancel(); + + // verify the parent chain exists + assert!(grandchild.inner.parent.is_some()); + assert!(child.inner.parent.is_some()); + assert!(root.inner.parent.is_none()); + } + + /// Test that throw_irrecoverable properly panics when called + #[test] + fn test_throw_irrecoverable_panics() { + let result = std::panic::catch_unwind(|| { + let root = IrrevocableContext::new(&span_fixture(), "test_throw_root"); + let child = root.child("test_throw_child"); + + // This should panic with the irrecoverable error message + child.throw_irrecoverable(anyhow::anyhow!("test irrecoverable error")); + }); + + // Verify that a panic occurred + assert!(result.is_err()); + + // Verify the panic message contains our error + if let Err(panic_payload) = result { + if let Some(panic_msg) = panic_payload.downcast_ref::() { + assert_eq!(panic_msg, "irrecoverable error: test irrecoverable error"); + } else{ + panic!("unexpected panic payload type"); + } + } + } +} \ No newline at end of file diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index a694229..f5cb534 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -10,6 +10,9 @@ pub mod examples; +#[cfg(test)] +mod context_test; + use anyhow::Result; use std::sync::Arc; use tokio_util::sync::CancellationToken; @@ -145,148 +148,3 @@ impl Clone for IrrevocableContext { } } -#[cfg(test)] -mod tests { - use super::*; - use crate::core::testutil::fixtures::{span_fixture, wait_until}; - use tokio::time::{sleep, Duration}; - - /// this test ensures that cancelling a context works as expected - #[tokio::test] - async fn test_basic_cancellation() { - let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); - - assert!(!ctx.is_cancelled()); - ctx.cancel(); - - // Wait until cancellation is processed - let ctx_clone = ctx.clone(); - wait_until( - move || ctx_clone.is_cancelled(), - Duration::from_millis(100) - ).await.expect("context should be cancelled within 100ms"); - } - - /// this test ensures that cancelling a parent context cancels its children - #[tokio::test] - async fn test_child_cancellation() { - let parent = IrrevocableContext::new(&span_fixture(), "test_context"); - let child = parent.child("test_child"); - - assert!(!child.is_cancelled()); - parent.cancel(); // Cancel parent - - // Wait for cancellation to propagate - let child_clone = child.clone(); - wait_until( - move || child_clone.is_cancelled(), - Duration::from_millis(100) - ).await.expect("child context should be cancelled within 100ms"); - } - - /// this test ensures that running an operation completes successfully if its context is not canceled - #[tokio::test] - async fn test_successful_operation() { - let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); - - let result = ctx.run(async { - Ok::(42) - }).await; - - assert!(result.is_ok()); - assert_eq!(result.unwrap(), 42); - } - - /// this test ensures that running an operation respects cancellation - /// the operation should not complete if the context is canceled - #[tokio::test] - async fn test_run_with_cancellation() { - let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); - - // Cancel the context - ctx.cancel(); - - // Wait for cancellation to be processed - let ctx_clone = ctx.clone(); - wait_until( - move || ctx_clone.is_cancelled(), - Duration::from_millis(100) - ).await.expect("context should be cancelled within 100ms"); - - let result = ctx.run(async { - // This should not execute due to cancellation - sleep(Duration::from_millis(10)).await; - Ok::(42) - - }).await; - - // The operation should return an error since the context was canceled before it could complete - assert!(result.is_err()); - // The error should indicate cancellation - assert!(result.unwrap_err().to_string().contains("context cancelled")); - } - - - /// this test ensures that nested child contexts are canceled when the root context is canceled - #[tokio::test] - async fn test_nested_children() { - let root = IrrevocableContext::new(&span_fixture(), "test_nested_children_root"); - let child1 = root.child("child1"); - let child2 = child1.child("child2"); - let grandchild = child2.child("grandchild"); - - // Initially, none should be cancelled - assert!(!grandchild.is_cancelled()); - - // Cancel root - should propagate to all children - root.cancel(); - - // Wait for cancellation to propagate to all children - let child1_clone = child1.clone(); - let child2_clone = child2.clone(); - let grandchild_clone = grandchild.clone(); - wait_until( - move || child1_clone.is_cancelled() && child2_clone.is_cancelled() && grandchild_clone.is_cancelled(), - Duration::from_millis(100) - ).await.expect("all child contexts should be cancelled within 100ms"); - } - - /// Test that we can create the error propagation hierarchy - #[test] - fn test_error_propagation_structure() { - let root = IrrevocableContext::new(&span_fixture(), "test_error_propagation_root"); - let child = root.child("error_prop_child"); - let grandchild = child.child("error_prop_grandchild"); - - grandchild.cancel(); - - // verify the parent chain exists - assert!(grandchild.inner.parent.is_some()); - assert!(child.inner.parent.is_some()); - assert!(root.inner.parent.is_none()); - } - - /// Test that throw_irrecoverable properly panics when called - #[test] - fn test_throw_irrecoverable_panics() { - let result = std::panic::catch_unwind(|| { - let root = IrrevocableContext::new(&span_fixture(), "test_throw_root"); - let child = root.child("test_throw_child"); - - // This should panic with the irrecoverable error message - child.throw_irrecoverable(anyhow::anyhow!("test irrecoverable error")); - }); - - // Verify that a panic occurred - assert!(result.is_err()); - - // Verify the panic message contains our error - if let Err(panic_payload) = result { - if let Some(panic_msg) = panic_payload.downcast_ref::() { - assert_eq!(panic_msg, "irrecoverable error: test irrecoverable error"); - } else{ - panic!("unexpected panic payload type"); - } - } - } -} \ No newline at end of file From 003416b42c6aa7e4d3f7af2d8cca66de341dbc96 Mon Sep 17 00:00:00 2001 From: sanaz Date: Thu, 18 Sep 2025 11:23:32 -0700 Subject: [PATCH 16/17] deletes examples.rs --- src/core/context/examples.rs | 206 ----------------------------------- src/core/context/mod.rs | 2 - 2 files changed, 208 deletions(-) delete mode 100644 src/core/context/examples.rs diff --git a/src/core/context/examples.rs b/src/core/context/examples.rs deleted file mode 100644 index 2eee0c9..0000000 --- a/src/core/context/examples.rs +++ /dev/null @@ -1,206 +0,0 @@ -//! Examples demonstrating IrrevocableContext usage patterns -//! -//! This module shows how to use the simplified cancelable context for -//! cancellation and irrecoverable error handling. - -use crate::core::context::IrrevocableContext; -use anyhow::{anyhow, Result}; -use tokio::time::{sleep, Duration}; -use tracing::Span; - -/// Example: Basic cancellation usage -pub async fn basic_cancellation_example(parent_span: &Span) -> Result<()> { - let ctx = IrrevocableContext::new(parent_span, "basic_cancellation_example"); - let child_ctx = ctx.child("worker_task"); - - // Spawn background work with child context - // this will start in a separate thread - // it runs for a few seconds unless cancelled - let work_handle = tokio::spawn( // spawn a new asynchronous task - { - // use a clone of the child context - let work_ctx = child_ctx.clone(); - async move { - work_ctx.run(async { - tracing::info!("Starting long-running operation..."); - sleep(Duration::from_secs(5)).await; - tracing::info!("Operation completed"); - Ok(()) - }).await - } - } - ); - - // Cancel after a short time - tokio::spawn({ - let cancel_ctx = ctx.clone(); - async move { - sleep(Duration::from_millis(100)).await; - tracing::info!("Cancelling operation"); - cancel_ctx.cancel(); - } - }); - - // Match against an expected outcome (cancellation) - match work_handle.await { - Ok(Err(_)) => { - tracing::info!("✓ Work was cancelled as expected"); - Ok(()) // SUCCESS: Cancellation worked - } - Ok(Ok(())) => { - tracing::error!("✗ Work completed but should have been cancelled"); - Err(anyhow!("Expected cancellation but work completed successfully")) - } - Err(e) => { - tracing::error!("✗ Work panicked unexpectedly: {}", e); - Err(anyhow!("Work panicked: {}", e)) - } - } - -} - -/// Example: Server startup with irrecoverable error handling -/// All operations are critical - if any fail, the program should terminate -/// In this example, all operations succeed for demonstration purposes -pub async fn startup_example(parent_span: &Span) -> Result<()> { - let ctx = IrrevocableContext::new(parent_span, "startup_example"); - - let startup_operations = vec![ - "initialize_network", - "load_configuration", - "setup_routing", - "register_services", - ]; - - for operation in startup_operations { - // Use run_or_throw for critical startup operations - // If any fails, the program terminates - ctx.run_or_throw(simulate_startup_operation(operation)).await; - tracing::info!("Completed startup operation: {}", operation); - } - - tracing::info!("Server startup completed successfully"); - Ok(()) -} - -/// Example: Hierarchical cancellation -pub async fn hierarchical_cancellation_example(parent_span: &Span) -> Result<()> { - let root_ctx = IrrevocableContext::new(parent_span, "hierarchical_cancellation_example"); - let service1_ctx = root_ctx.child("service1"); - let service2_ctx = root_ctx.child("service2"); - - // Spawn multiple services - let service1_handle = tokio::spawn({ - let ctx = service1_ctx.clone(); - async move { - ctx.run(async { - tracing::info!("Service 1 starting"); - sleep(Duration::from_secs(2)).await; - tracing::info!("Service 1 completed"); - Ok(()) - }).await - } - }); - - let service2_handle = tokio::spawn({ - let ctx = service2_ctx.clone(); - async move { - ctx.run(async { - tracing::info!("Service 2 starting"); - sleep(Duration::from_secs(2)).await; - tracing::info!("Service 2 completed"); - Ok(()) - }).await - } - }); - - // Cancel root after short time - should cancel all services - tokio::spawn({ - let ctx = root_ctx.clone(); - async move { - sleep(Duration::from_millis(50)).await; - tracing::info!("Shutting down all services"); - ctx.cancel(); - } - }); - - let (result1, result2) = tokio::join!(service1_handle, service2_handle); - - match result1 { - Ok(Ok(())) => tracing::info!("Service 1 completed"), - Ok(Err(e)) => tracing::info!("Service 1 cancelled: {}", e), - Err(e) => tracing::error!("Service 1 panicked: {}", e), - } - - match result2 { - Ok(Ok(())) => tracing::info!("Service 2 completed"), - Ok(Err(e)) => tracing::info!("Service 2 cancelled: {}", e), - Err(e) => tracing::error!("Service 2 panicked: {}", e), - } - - Ok(()) -} - -/// Example: Error propagation pattern -pub async fn error_propagation_example(parent_span: &Span) -> Result<()> { - let root_ctx = IrrevocableContext::new(parent_span, "error_propagation_example"); - let child_ctx = root_ctx.child("child_context"); - let _grandchild_ctx = child_ctx.child("grandchild_context"); - - // Simulate a critical error in deeply nested operation - let critical_error = anyhow!("Critical database connection failed"); - - // In a real scenario, uncommenting the line below would terminate the program: - // grandchild_ctx.throw_irrecoverable(critical_error); - - // For demo purposes, just log what would happen - tracing::info!("Would propagate error through context hierarchy: {}", critical_error); - tracing::info!("Error would bubble from grandchild -> child -> root -> program exit"); - - Ok(()) -} - -// Helper function to simulate startup operations -// Fails if operation_name is "fail_critical" -// otherwise succeeds after a short delay -async fn simulate_startup_operation(operation_name: &str) -> Result<()> { - // Simulate work - sleep(Duration::from_millis(20)).await; - - // Simulate potential critical failure - if operation_name == "fail_critical" { - return Err(anyhow!("Critical failure in {}", operation_name)); - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::core::testutil::fixtures::span_fixture; - - #[tokio::test] - async fn test_basic_cancellation_example() { - let result = basic_cancellation_example(&span_fixture()).await; - assert!(result.is_ok()); - } - - #[tokio::test] - async fn test_startup_example() { - let result = startup_example(&span_fixture()).await; - assert!(result.is_ok()); - } - - #[tokio::test] - async fn test_hierarchical_cancellation_example() { - let result = hierarchical_cancellation_example(&span_fixture()).await; - assert!(result.is_ok()); - } - - #[tokio::test] - async fn test_error_propagation_example() { - let result = error_propagation_example(&span_fixture()).await; - assert!(result.is_ok()); - } -} \ No newline at end of file diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs index f5cb534..4166dab 100644 --- a/src/core/context/mod.rs +++ b/src/core/context/mod.rs @@ -8,8 +8,6 @@ //! Unlike the full Go context API, this implementation focuses only on the core //! functionality needed: cancellation and error propagation. -pub mod examples; - #[cfg(test)] mod context_test; From 1a032b5fe90698769562f97bbc98856a748c521d Mon Sep 17 00:00:00 2001 From: sanaz Date: Thu, 18 Sep 2025 11:28:42 -0700 Subject: [PATCH 17/17] removes context method from BaseNode --- src/node/base_node.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/node/base_node.rs b/src/node/base_node.rs index 89759e7..ff88655 100644 --- a/src/node/base_node.rs +++ b/src/node/base_node.rs @@ -218,12 +218,6 @@ impl EventProcessorCore for BaseNode { } impl BaseNode { - /// Get a reference to the cancelable context for this node. - #[allow(dead_code)] // for now, but should remove it in the future - pub fn context(&self) -> &IrrevocableContext { - &self.ctx - } - /// Create a new `BaseNode` with the provided identifier, membership vector /// and lookup table. #[cfg(test)] // TODO: Remove once BaseNode is used in production code.