diff --git a/CLAUDE.md b/CLAUDE.md index 276856f..c06094c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -110,6 +110,55 @@ impl Clone for Box { **Why Not Copy**: Copy requires all fields to be Copy, doesn't work with trait objects, and semantically implies independent data rather than shared ownership. +### Explicit Clone Implementation for Future-Proof Shallow Cloning + +**Principle**: For logic-handling structures that use shallow cloning, implement `Clone` explicitly rather than deriving it. This prevents accidental deep cloning if future changes introduce non-shallow-clonable fields. + +**Implementation Pattern**: +```rust +// GOOD: Explicit Clone implementation +pub struct LogicStruct { + inner: Arc>, + // If future changes add fields that shouldn't be deep cloned, + // this implementation will need explicit updating +} + +// Remove #[derive(Clone)] and implement manually +impl Clone for LogicStruct { + fn clone(&self) -> Self { + // Shallow clone: cloned instances share the same underlying data via Arc + // This comment makes the intent explicit and serves as a reminder + // for future maintainers + LogicStruct { + inner: Arc::clone(&self.inner), + // Any new fields added here must maintain shallow cloning semantics + } + } +} +``` + +**Avoid Pattern**: +```rust +// AVOID: Derived Clone that may become deep cloning +#[derive(Clone)] // Dangerous if future fields are added +pub struct LogicStruct { + inner: Arc>, +} +``` + +**Benefits**: +- **Future-proof**: Adding new fields requires explicit decision about cloning behavior +- **Intent clarity**: Makes shallow cloning semantics explicit in code +- **Compile-time safety**: Non-clonable fields will cause compilation errors, forcing deliberate handling +- **Documentation**: Comments explain the shallow cloning contract + +**Reference Implementation**: See `IrrevocableContext` in `src/core/context/mod.rs` + +**When to Apply**: +- All logic-handling structures using Arc-based shared state +- Structures where shallow cloning semantics are critical to correctness +- Types that may evolve to include complex state in the future + ### Internal Thread Safety Pattern **Principle**: Prefer internal thread safety over external mutual exclusion. Structures should be internally thread-safe using Arc> patterns rather than requiring external Arc> wrapping. diff --git a/Cargo.toml b/Cargo.toml index 546bcce..78cf720 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,4 +15,6 @@ tracing-subscriber = "0.3" tracing-appender = "0.2" tracing-log = "0.1" unimock = "0.6" -parking_lot = "0.12" \ No newline at end of file +parking_lot = "0.12" +tokio = { version = "1.0", features = ["sync", "time", "macros", "rt", "rt-multi-thread"] } +tokio-util = "0.7" \ No newline at end of file diff --git a/src/core/context/context_test.rs b/src/core/context/context_test.rs new file mode 100644 index 0000000..b91a55c --- /dev/null +++ b/src/core/context/context_test.rs @@ -0,0 +1,145 @@ +#[cfg(test)] +mod tests { + use crate::core::context::IrrevocableContext; + use crate::core::testutil::fixtures::{span_fixture, wait_until}; + use tokio::time::{sleep, Duration}; + + /// this test ensures that cancelling a context works as expected + #[tokio::test] + async fn test_basic_cancellation() { + let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); + + assert!(!ctx.is_cancelled()); + ctx.cancel(); + + // Wait until cancellation is processed + let ctx_clone = ctx.clone(); + wait_until( + move || ctx_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("context should be cancelled within 100ms"); + } + + /// this test ensures that cancelling a parent context cancels its children + #[tokio::test] + async fn test_child_cancellation() { + let parent = IrrevocableContext::new(&span_fixture(), "test_context"); + let child = parent.child("test_child"); + + assert!(!child.is_cancelled()); + parent.cancel(); // Cancel parent + + // Wait for cancellation to propagate + let child_clone = child.clone(); + wait_until( + move || child_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("child context should be cancelled within 100ms"); + } + + /// this test ensures that running an operation completes successfully if its context is not canceled + #[tokio::test] + async fn test_successful_operation() { + let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); + + let result = ctx.run(async { + Ok::(42) + }).await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + } + + /// this test ensures that running an operation respects cancellation + /// the operation should not complete if the context is canceled + #[tokio::test] + async fn test_run_with_cancellation() { + let ctx = IrrevocableContext::new(&span_fixture(), "test_context"); + + // Cancel the context + ctx.cancel(); + + // Wait for cancellation to be processed + let ctx_clone = ctx.clone(); + wait_until( + move || ctx_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("context should be cancelled within 100ms"); + + let result = ctx.run(async { + // This should not execute due to cancellation + sleep(Duration::from_millis(10)).await; + Ok::(42) + + }).await; + + // The operation should return an error since the context was canceled before it could complete + assert!(result.is_err()); + // The error should indicate cancellation + assert!(result.unwrap_err().to_string().contains("context cancelled")); + } + + + /// this test ensures that nested child contexts are canceled when the root context is canceled + #[tokio::test] + async fn test_nested_children() { + let root = IrrevocableContext::new(&span_fixture(), "test_nested_children_root"); + let child1 = root.child("child1"); + let child2 = child1.child("child2"); + let grandchild = child2.child("grandchild"); + + // Initially, none should be cancelled + assert!(!grandchild.is_cancelled()); + + // Cancel root - should propagate to all children + root.cancel(); + + // Wait for cancellation to propagate to all children + let child1_clone = child1.clone(); + let child2_clone = child2.clone(); + let grandchild_clone = grandchild.clone(); + wait_until( + move || child1_clone.is_cancelled() && child2_clone.is_cancelled() && grandchild_clone.is_cancelled(), + Duration::from_millis(100) + ).await.expect("all child contexts should be cancelled within 100ms"); + } + + /// Test that we can create the error propagation hierarchy + #[test] + fn test_error_propagation_structure() { + let root = IrrevocableContext::new(&span_fixture(), "test_error_propagation_root"); + let child = root.child("error_prop_child"); + let grandchild = child.child("error_prop_grandchild"); + + grandchild.cancel(); + + // verify the parent chain exists + assert!(grandchild.inner.parent.is_some()); + assert!(child.inner.parent.is_some()); + assert!(root.inner.parent.is_none()); + } + + /// Test that throw_irrecoverable properly panics when called + #[test] + fn test_throw_irrecoverable_panics() { + let result = std::panic::catch_unwind(|| { + let root = IrrevocableContext::new(&span_fixture(), "test_throw_root"); + let child = root.child("test_throw_child"); + + // This should panic with the irrecoverable error message + child.throw_irrecoverable(anyhow::anyhow!("test irrecoverable error")); + }); + + // Verify that a panic occurred + assert!(result.is_err()); + + // Verify the panic message contains our error + if let Err(panic_payload) = result { + if let Some(panic_msg) = panic_payload.downcast_ref::() { + assert_eq!(panic_msg, "irrecoverable error: test irrecoverable error"); + } else{ + panic!("unexpected panic payload type"); + } + } + } +} \ No newline at end of file diff --git a/src/core/context/mod.rs b/src/core/context/mod.rs new file mode 100644 index 0000000..4166dab --- /dev/null +++ b/src/core/context/mod.rs @@ -0,0 +1,148 @@ +//! Cancelable context with irrecoverable error propagation +//! +//! This module provides a simplified context implementation focused on: +//! - Cancellation support via tokio's CancellationToken +//! - Parent-child context hierarchies +//! - Irrecoverable error propagation that terminates the application +//! +//! Unlike the full Go context API, this implementation focuses only on the core +//! functionality needed: cancellation and error propagation. + +#[cfg(test)] +mod context_test; + +use anyhow::Result; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; +use tracing::Span; + +/// A cancelable context that supports parent-child hierarchies and irrecoverable error propagation. +/// +/// When an irrecoverable error is thrown, it propagates up to the root context and terminates the program. +/// Children automatically get cancelled when their parent is cancelled. +pub struct IrrevocableContext { + inner: Arc, +} + +struct ContextInner { + token: CancellationToken, + parent: Option, + span: Span, +} + +impl IrrevocableContext { + /// Create a new root context + pub fn new(parent_span: &Span, tag: &str) -> Self { + let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "irrevocable_context", tag = tag); + + Self { + inner: Arc::new(ContextInner { + token: CancellationToken::new(), + parent: None, + span, + }), + } + } + + /// Create a child context that inherits cancellation from the parent + pub fn child(&self, tag: &str) -> Self { + let span = tracing::span!(parent: &self.inner.span, tracing::Level::TRACE, "irrevocable_context_child", tag = tag); + Self { + inner: Arc::new(ContextInner { + token: self.inner.token.child_token(), + parent: Some(self.clone()), + span, + }), + } + } + + /// triggers Cancel in this context and all its children + /// to check if cancellation is complete, use `cancelled().await` + pub fn cancel(&self) { + let _enter = self.inner.span.enter(); + tracing::trace!("cancelling context"); + self.inner.token.cancel(); + } + + /// Check if the context is cancelled (non-blocking, private) + fn is_cancelled(&self) -> bool { + self.inner.token.is_cancelled() + } + + /// Wait for the context to be cancelled (async) + pub async fn cancelled(&self) { + self.inner.token.cancelled().await; + } + + /// Run an operation with cancellation support + /// If the context is cancelled before the operation completes, it returns an error. + /// otherwise, it returns the operation's result. + pub async fn run(&self, future: F) -> Result + where + F: std::future::Future>, + { + let _enter = self.inner.span.enter(); + + tokio::select! { + result = future => result, + _ = self.cancelled() => { + Err(anyhow::anyhow!("context cancelled")) + } + } + } + + /// Propagate an irrecoverable error up the context chain. + /// When it reaches the root context, it terminates the program. + /// there is no return from this function. + pub fn throw_irrecoverable(&self, err: anyhow::Error) -> ! { + let _enter = self.inner.span.enter(); + + // Propagate to parent if it exists + if let Some(parent) = &self.inner.parent { + tracing::error!("propagating irrecoverable error to parent context"); + parent.throw_irrecoverable(err); + } + + // Root context - panic with the error + panic!("irrecoverable error: {}", err); + } + + /// Run an operation, throwing irrecoverable error on failure + /// This is a convenience method that combines `run` and `throw_irrecoverable`. + /// If the operation succeeds, it returns the result. + /// If it fails, it propagates the error irrecoverably, terminating the program. + pub async fn run_or_throw(&self, future: F) -> T + where + F: std::future::Future>, + { + match self.run(future).await { + Ok(value) => value, + Err(err) => self.throw_irrecoverable(err), + } + } +} + +// Custom Debug implementation for better visibility into the context state +impl std::fmt::Debug for IrrevocableContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IrrevocableContext") + .field("is_cancelled", &self.is_cancelled()) + .field("has_parent", &self.inner.parent.is_some()) + .finish() + } +} + +/// Custom Clone implementation to ensure shallow cloning behavior. +/// This implementation explicitly controls cloning to ensure that: +/// - Only Arc pointer is cloned (shallow), not the underlying data +/// - If future changes add non-shallow-clonable fields, this implementation +/// must be updated to maintain the shallow cloning semantics +impl Clone for IrrevocableContext { + fn clone(&self) -> Self { + // Shallow clone: cloned instances share the same underlying data via Arc + IrrevocableContext { + inner: Arc::clone(&self.inner), + } + } +} + diff --git a/src/core/mod.rs b/src/core/mod.rs index 15e7d84..323c974 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,8 +1,10 @@ +pub mod context; mod lookup; pub mod model; #[cfg(test)] pub mod testutil; +pub use crate::core::context::IrrevocableContext; pub use crate::core::lookup::array_lookup_table::ArrayLookupTable; pub use crate::core::lookup::array_lookup_table::LOOKUP_TABLE_LEVELS; pub use crate::core::lookup::LookupTable; diff --git a/src/core/testutil/fixtures.rs b/src/core/testutil/fixtures.rs index 487315d..f32b769 100644 --- a/src/core/testutil/fixtures.rs +++ b/src/core/testutil/fixtures.rs @@ -538,3 +538,63 @@ mod test { ); } } + +/// Waits until a condition becomes true within a specified timeout period. +/// This utility is designed for testing scenarios where you need to verify that +/// some asynchronous behavior occurs within a reasonable time frame. +/// +/// Uses a channel-based approach with cooperative +/// scheduling via `yield_now()` to be CPU-friendly while maintaining responsiveness. +/// +/// # Arguments +/// * `condition` - A closure that returns true when the expected condition is met +/// * `timeout` - Maximum time to wait for the condition to become true +/// +/// # Returns +/// * `Ok(())` if the condition becomes true within the timeout +/// * `Err(String)` if the timeout is exceeded before the condition is met +/// +/// # Example +/// ```ignore +/// // Wait for a context to be cancelled within 100ms +/// wait_until( +/// || context.is_cancelled(), +/// Duration::from_millis(100) +/// ).await.expect("context should be cancelled within 100ms"); +/// ``` +pub async fn wait_until( + mut condition: F, + timeout: Duration, +) -> Result<(), String> +where + F: FnMut() -> bool + Send + 'static, +{ + let (tx, rx) = tokio::sync::oneshot::channel::>(); + + // Spawn a single task that polls the condition and sends the result + let condition_task = tokio::task::spawn_blocking(move || { + loop { + if condition() { + // Condition met - send success and return + if tx.send(Ok(())).is_err() { + // Receiver dropped, but we're done anyway + } + return; + } + // Cooperative scheduling - yield to other threads + std::thread::yield_now(); + } + }); + + // Wait for the result with timeout + let result = match tokio::time::timeout(timeout, rx).await { + Ok(Ok(result)) => result, + Ok(Err(_)) => Err("channel closed unexpectedly".to_string()), + Err(_) => Err(format!("condition not met within timeout of {:?}", timeout)), + }; + + // Clean up the task if it's still running + condition_task.abort(); + + result +} diff --git a/src/node/base_node.rs b/src/node/base_node.rs index 4139f9d..ff88655 100644 --- a/src/node/base_node.rs +++ b/src/node/base_node.rs @@ -1,6 +1,6 @@ use crate::core::model::direction::Direction; use crate::core::{ - Identifier, IdSearchReq, IdSearchRes, LookupTable, MembershipVector, + IrrevocableContext, Identifier, IdSearchReq, IdSearchRes, LookupTable, MembershipVector, }; #[cfg(test)] // TODO: Remove once BaseNode is used in production code. use crate::network::MessageProcessor; @@ -21,6 +21,7 @@ pub(crate) struct BaseNode { lt: Box, net: Box, span: Span, + ctx: IrrevocableContext, } impl Node for BaseNode { @@ -231,6 +232,9 @@ impl BaseNode { let span = tracing::span!(parent: parent_span, tracing::Level::TRACE, "base_node"); let _enter = span.enter(); + // Create a cancelable context for this node + let ctx = IrrevocableContext::new(&span, "base_node_context"); + tracing::trace!( "creating BaseNode with id {:?}, mem_vec {:?}", id, @@ -243,6 +247,7 @@ impl BaseNode { lt, net, span: span.clone(), + ctx, }; // Create a MessageProcessor from this node, instead of casting directly @@ -253,9 +258,11 @@ impl BaseNode { id ); - clone_net - .register_processor(processor) - .map_err(|e| anyhow!("could not register node in network: {}", e))?; + // Use throw_irrecoverable for critical startup failures + if let Err(e) = clone_net.register_processor(processor) { + let error = anyhow!("could not register node in network: {}", e); + node.ctx.throw_irrecoverable(error); + } tracing::trace!( "successfully created and registered BaseNode {:?}", @@ -272,7 +279,7 @@ impl BaseNode { impl PartialEq for BaseNode { fn eq(&self, other: &Self) -> bool { self.id == other.id && self.mem_vec == other.mem_vec - // ignore lt for equality check as comparing trait objects is non-trivial + // ignore lt and ctx for equality check as comparing trait objects is non-trivial } } @@ -293,6 +300,7 @@ impl Clone for BaseNode { lt: self.lt.clone(), net: self.net.clone(), span: self.span.clone(), + ctx: self.ctx.clone(), } } } @@ -310,12 +318,14 @@ mod tests { fn test_base_node() { let id = random_identifier(); let mem_vec = random_membership_vector(); + let span = span_fixture(); let node = BaseNode { id, mem_vec, - lt: Box::new(ArrayLookupTable::new(&span_fixture())), + lt: Box::new(ArrayLookupTable::new(&span)), net: Box::new(Unimock::new(())), // No expectations needed for direct struct construction - span: span_fixture(), + span: span.clone(), + ctx: IrrevocableContext::new(&span, "base_node_test_context"), }; assert_eq!(node.get_identifier(), &id); assert_eq!(node.get_membership_vector(), &mem_vec);