Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
5954edf
Merge remote-tracking branch 'origin/main' into thep2p/24-underlay-in…
thep2p Jul 17, 2025
8e8d720
Introduces network communication layer
thep2p Jul 17, 2025
a55250b
Adds mock network implementation
thep2p Jul 17, 2025
b336eb4
Passes NetworkHub to MockNetwork on creation
thep2p Jul 17, 2025
608f41e
Adds mock network testing capabilities.
thep2p Jul 17, 2025
8c2c8d6
blocking test with smart pointers
thep2p Jul 18, 2025
dfe3d8f
Refactors network creation to use Rc
thep2p Jul 23, 2025
83e07f5
Refactors mock network to use Rc
thep2p Jul 23, 2025
cfa5298
Adds incoming message processing to mock network
thep2p Jul 24, 2025
1f9118e
Renames `send_message` to `incoming_message`
thep2p Jul 24, 2025
4fa2c99
Removes unused message variable
thep2p Jul 24, 2025
091bd78
Removes unused network functions
thep2p Jul 24, 2025
e3f204b
Removes payload from Message struct
thep2p Jul 24, 2025
7dcb8ae
Renames MessageType to Payload
thep2p Jul 24, 2025
57c2ac2
Improves mock network message processing
thep2p Jul 24, 2025
1048328
Updates module visibility for testing
thep2p Jul 24, 2025
073b6ce
Adds documentation to mock network components
thep2p Jul 27, 2025
4f67119
Adds concurrent message sending test
thep2p Jul 31, 2025
91ba2a8
Refactors network mock for thread safety
thep2p Jul 31, 2025
986333a
Relaxes trait bound on MessageProcessor
thep2p Jul 31, 2025
83f74a6
Merge branch 'main' into thep2p/24-concurrent-mock-network
thep2p Aug 6, 2025
60ec21b
docs: Align doc comment for incoming_message
thep2p Aug 7, 2025
0c8dbf7
refactor: Remove unused Rc and RefCell imports
thep2p Aug 7, 2025
c7197fe
refactor: Modernize and simplify mock network test code
thep2p Aug 7, 2025
6ead5e2
refactor(network): Remove unnecessary unsafe Send impl for MockMessag…
thep2p Aug 7, 2025
513bd29
feat(network): Improve mutex lock error handling in MockNetwork
thep2p Aug 7, 2025
29f70cf
style(misc): Add missing newlines at end of files
thep2p Aug 7, 2025
c6f5022
docs: Add CLAUDE.md for AI assistant guidance
thep2p Aug 7, 2025
5e56551
Adds skip-graphs paper PDF.
thep2p Aug 9, 2025
085beb2
Adds debug derive to IdentifierSearchRequest
thep2p Aug 9, 2025
4e53b5d
feat: Expose network module and base_node
thep2p Aug 9, 2025
79a4332
feat: Prepare LocalNode for network integration
thep2p Aug 9, 2025
3b9bd37
feat: Implement core network message processing in LocalNode
thep2p Aug 9, 2025
5ef8499
feat: Begin implementing LocalNode::join protocol
thep2p Aug 9, 2025
bd47aa1
refactor: Adjust LocalNode tests for network field
thep2p Aug 9, 2025
ce420fc
feat: Expose mock network modules publicly
thep2p Aug 9, 2025
97189d8
refactor: Add Debug derive to core mock network structs
thep2p Aug 9, 2025
c9effed
refactor: Standardize Payload::TestMessage usage and enhance MockMess…
thep2p Aug 9, 2025
cf8fc34
Adds skip graph integration tests
thep2p Aug 9, 2025
f854757
Improves test failure messages and adds linting instructions
thep2p Aug 9, 2025
900ec06
Merge remote-tracking branch 'origin/main' into thep2p/local-skip-graph
thep2p Aug 13, 2025
a0418b6
refactor: Remove unused imports from network_test.rs
thep2p Aug 13, 2025
9c011f2
feat: Allow dead code for LocalNode's network setup functions
thep2p Aug 13, 2025
2d9ae76
Removes unused IdentifierSearcher trait
thep2p Aug 13, 2025
d341bb3
feat: Introduce No-Op Network for Testing
thep2p Aug 13, 2025
1063d3e
refactor: Simplify LocalNode constructor and network management
thep2p Aug 13, 2025
bb81e57
test: Update LocalNode call sites after constructor refactor
thep2p Aug 13, 2025
b905170
Refactor: Rename skip graph fixture function for clarity
thep2p Aug 13, 2025
62f961d
Feature: Enhance message processing with origin node identifier
thep2p Aug 13, 2025
7fe8404
Refactor: Integrate origin ID into mock network routing
thep2p Aug 13, 2025
6644e32
Chore: Add miscellaneous TODO comments for future work
thep2p Aug 13, 2025
b4874ad
Refactor: Reverse 'origin_id' and 'message' parameters in MessageProc…
thep2p Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@
- PR title: same format; no description or labels—maintainers handle that.
- Explain new/updated tests with doc comments.
- Update documentation and `README.md` for any changes.

## Linting Instructions
- Use `make lint` to run linting, fix all linting issues before committing.
Binary file added skip-graphs-paper.pdf
Binary file not shown.
1 change: 1 addition & 0 deletions src/core/search/id_search_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::core::lookup::lookup_table::LookupTableLevel;
use crate::core::model::direction::Direction;
use crate::core::Identifier;

#[derive(Debug)]
pub struct IdentifierSearchRequest {
pub target: Identifier,
pub level: LookupTableLevel,
Expand Down
18 changes: 0 additions & 18 deletions src/core/search/identifier_searcher.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/core/search/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub(crate) mod id_search_req;
pub(crate) mod id_search_res;
mod identifier_searcher;
79 changes: 78 additions & 1 deletion src/core/testutil/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ use crate::core::model::identifier::{MAX, ZERO};
use std::thread::JoinHandle;
use std::time::Duration;
use test_imports::*;
use crate::local::base_node::LocalNode;
use crate::network::mock::hub::NetworkHub;
use crate::network::Network;
use crate::core::Node;
use std::sync::{Arc, Mutex};

/// Generates a random identifier.
///
Expand Down Expand Up @@ -428,10 +433,82 @@ pub fn span_fixture() -> tracing::Span {
// Create a new tracing span with the name "test_span" at TRACE level.
// Subscriber level controls the minimum log level to display (e.g., DEBUG shows debug and above).
// Log macros inside spans determine the actual log level of each event.
// This span level is just a label for grouping and doesnt influence what gets logged.
// This span level is just a label for grouping and doesn't influence what gets logged.
tracing::span!(tracing::Level::TRACE, "test_span")
}

/// Creates a complete skip graph with n nodes connected via MockNetwork.
///
/// This function creates a fully connected skip graph where:
/// - All nodes have unique sorted identifiers
/// - Each node has a random membership vector
/// - All nodes are connected through a shared NetworkHub
/// - Nodes insert themselves one by one using the join algorithm
/// - The first node becomes the introducer for subsequent nodes
///
/// # Arguments
/// * `n` - The number of nodes to create in the skip graph
///
/// # Returns
/// A tuple containing:
/// - Vector of LocalNode instances representing the skip graph nodes
/// - The shared NetworkHub used for communication
///
/// # Panics
/// This function will panic if:
/// - n is 0 (cannot create an empty skip graph)
/// - Network operations fail during node creation or joining
///
pub fn new_local_skip_graph(n: usize) -> anyhow::Result<(Vec<LocalNode>, Arc<Mutex<NetworkHub>>)> {
if n == 0 {
return Err(anyhow::anyhow!("Cannot create skip graph with 0 nodes"));
}

let _span = span_fixture();

// Create a shared network hub for all nodes
let hub = NetworkHub::new();

// Generate sorted identifiers for all nodes to ensure proper ordering
let identifiers = random_sorted_identifiers(n);
let mut nodes = Vec::with_capacity(n);

// Create all nodes first
for &id in &identifiers {
let mem_vec = random_membership_vector();
let lt = Box::new(ArrayLookupTable::new(&span_fixture()));

// Create network for this node and register it with the hub
let network = NetworkHub::new_mock_network(hub.clone(), id)?;

// Create the LocalNode with network capability
let node = LocalNode::new(id, mem_vec, lt, network.clone());

// Register the node as a message processor for its network
// TODO: a node registering itself as processor to network must be done internally in the node
let node_processor = Arc::new(Mutex::new(node.clone()));
network
.lock()
.map_err(|_| anyhow::anyhow!("Failed to acquire network lock"))?
.register_processor(Box::new(node_processor))?;

nodes.push(node);
}

// Now perform the join operations to build the skip graph structure
if !nodes.is_empty() {
// TODO: consider using a random node each time as the introducer
// The first node is already in the skip graph (it's the introducer)
for i in 1..nodes.len() {
let introducer = nodes[0].clone();
nodes[i].join(std::rc::Rc::new(introducer))?;
}
}

tracing::debug!("Successfully created skip graph with {} nodes", n);
Ok((nodes, hub))
}

mod test {
use crate::core::model::identifier::ComparisonResult::CompareLess;
use crate::core::model::identifier::{MAX, ZERO};
Expand Down
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pub mod core;
mod local;
// TODO: for the time being, the network module is solely used in testing.
#[cfg(test)]
mod network;
pub mod network;
124 changes: 120 additions & 4 deletions src/local/base_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@ use crate::core::{
Identifier, IdentifierSearchRequest, IdentifierSearchResult, LookupTable, MembershipVector,
Node,
};
use crate::network::{Message, MessageProcessor, Network, Payload};
use anyhow::{anyhow, Context};
use std::fmt;
use std::fmt::Formatter;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

/// LocalNode is a struct that represents a single node in the local implementation of the skip graph.
pub(crate) struct LocalNode {
id: Identifier,
mem_vec: MembershipVector,
lt: Box<dyn LookupTable>,
network: Option<Arc<Mutex<dyn Network>>>,
}

impl Node for LocalNode {
// TODO: this must be the address of the node in the network, not just a reference to itself.
type Address = Rc<LocalNode>;

fn get_identifier(&self) -> &Identifier {
Expand Down Expand Up @@ -144,17 +149,118 @@ impl Node for LocalNode {
todo!()
}

fn join(&self, _introducer: Self::Address) -> anyhow::Result<()> {
todo!()
fn join(&self, introducer: Self::Address) -> anyhow::Result<()> {
// Implement the join protocol based on Algorithm 2 from the skip graphs paper
// Step 1: Search for our own identifier to find our position at level 0
let search_req = IdentifierSearchRequest::new(self.id, 0, Direction::Left);

// In a full implementation, we would search through the introducer
// For now, we'll use the introducer's search capability directly
let _search_result = introducer.search_by_id(&search_req)?;

// Step 2: Insert ourselves at level 0
// This would involve updating the lookup tables of neighboring nodes

// Step 3: Iteratively find nodes at higher levels with matching membership vector prefixes
// and insert ourselves at those levels
let mut level = 0;
loop {
// Find a node at the current level with matching membership vector prefix
// If no such node exists, we're done joining
// For now, we'll just return success after one iteration
if level > 0 {
break;
}
level += 1;
}

tracing::debug!("Node {} joined the skip graph", self.id);
Ok(())
}
}

impl LocalNode {
/// Create a new `LocalNode` with the provided identifier, membership vector
/// and lookup table.
#[cfg(test)]
pub(crate) fn new(id: Identifier, mem_vec: MembershipVector, lt: Box<dyn LookupTable>) -> Self {
LocalNode { id, mem_vec, lt }
pub(crate) fn new(id: Identifier, mem_vec: MembershipVector, lt: Box<dyn LookupTable>, network: Arc<Mutex<dyn Network>>) -> Self {
LocalNode { id, mem_vec, lt, network: Some(network) }
}

/// Sends a message through the network if available
fn send_message(&self, message: Message) -> anyhow::Result<()> {
if let Some(ref network) = self.network {
network
.lock()
.map_err(|_| anyhow!("Failed to acquire network lock"))?
.send_message(message)
.context("Failed to send message through network")
} else {
Err(anyhow!("No network connection available"))
}
}
}

impl MessageProcessor for LocalNode {
/// Process incoming network messages for skip graph operations
fn process_incoming_message(&mut self, origin_id: Identifier, message: Message) -> anyhow::Result<()> {
match message.payload {
Payload::SearchRequest(search_req) => {
// Process the search request using the local node's search capability
match self.search_by_id(&search_req) {
Ok(search_result) => {
// Create a response message with the search result
let response_message = Message {
payload: Payload::SearchResponse(search_result),
target_node_id: origin_id, // Send response back to original requester
};

self.send_message(response_message)
.context("Failed to send search response")
}
Err(e) => Err(anyhow!("Search failed: {}", e)),
}
}
Payload::JoinRequest { node_id, level } => {
// Handle join request from another node
// For now, just acknowledge the join request
// In a full implementation, this would involve the skip graph join protocol
let response = Payload::JoinResponse {
success: true,
message: format!("Join request received for node {node_id} at level {level}"),
};

let response_message = Message {
payload: response,
target_node_id: origin_id,
};

self.send_message(response_message)
.context("Failed to send join response")
}
Payload::JoinResponse { success, message } => {
// Handle join response
if success {
tracing::debug!("Join successful: {}", message);
} else {
tracing::warn!("Join failed: {}", message);
}
Ok(())
}
Payload::SearchResponse(search_result) => {
// Handle search response
tracing::debug!(
"Received search response: target={}, result={}",
search_result.target(),
search_result.result()
);
Ok(())
}
Payload::TestMessage(msg) => {
tracing::debug!("Received test message: {}", msg);
Ok(())
}
}
}
}

Expand Down Expand Up @@ -183,6 +289,7 @@ impl Clone for LocalNode {
id: self.id,
mem_vec: self.mem_vec,
lt: self.lt.clone(),
network: self.network.clone(),
}
}
}
Expand All @@ -208,6 +315,7 @@ mod tests {
id,
mem_vec,
lt: Box::new(ArrayLookupTable::new(&span_fixture())),
network: None,
};
assert_eq!(node.get_identifier(), &id);
assert_eq!(node.get_membership_vector(), &mem_vec);
Expand Down Expand Up @@ -241,6 +349,7 @@ mod tests {
id: random_identifier(),
mem_vec: random_membership_vector(),
lt: Box::new(lt.clone()),
network: None,
};

let direction = Direction::Left;
Expand Down Expand Up @@ -291,6 +400,7 @@ mod tests {
id: random_identifier(),
mem_vec: random_membership_vector(),
lt: Box::new(lt.clone()),
network: None,
};

let actual_result = node.search_by_id(&req).unwrap();
Expand Down Expand Up @@ -356,6 +466,7 @@ mod tests {
id: random_identifier(),
mem_vec: random_membership_vector(),
lt: Box::new(lt.clone()),
network: None,
};

let direction = Direction::Left;
Expand Down Expand Up @@ -416,6 +527,7 @@ mod tests {
id: random_identifier(),
mem_vec: random_membership_vector(),
lt: Box::new(lt.clone()),
network: None,
};

let direction = Direction::Right;
Expand Down Expand Up @@ -449,6 +561,7 @@ mod tests {
id: random_identifier(),
mem_vec: random_membership_vector(),
lt: Box::new(lt.clone()),
network: None,
};

// This test should ensure that when the exact target is found, it returns the correct level and identifier.
Expand Down Expand Up @@ -494,6 +607,7 @@ mod tests {
id: random_identifier(),
mem_vec: random_membership_vector(),
lt: Box::new(lt.clone()),
network: None,
});

// Ensure the target is not the same as the node's identifier
Expand Down Expand Up @@ -575,6 +689,7 @@ mod tests {
id: random_identifier(),
mem_vec: random_membership_vector(),
lt: Box::new(lt.clone()),
network: None,
});

// Ensure the target is not the same as the node's identifier
Expand Down Expand Up @@ -683,6 +798,7 @@ mod tests {
id: random_identifier(),
mem_vec: random_membership_vector(),
lt: Box::new(MockErrorLookupTable),
network: None,
};

// Create a random search request (any search request will return an error as
Expand Down
4 changes: 3 additions & 1 deletion src/local/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod base_node;
pub mod base_node;
#[cfg(test)]
mod search_by_id_test;
#[cfg(test)]
mod skip_graph_integration_test;
2 changes: 2 additions & 0 deletions src/local/search_by_id_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::base_node::LocalNode;
use crate::core::model::direction::Direction;
use crate::core::testutil::fixtures::{random_membership_vector, span_fixture};
use crate::core::{ArrayLookupTable, Identifier, IdentifierSearchRequest, Node};
use crate::network::mock::noop_network::new_noop_network;

// TODO: move other tests from base_node.rs here
/// Tests fallback behavior of `search_by_id` when no neighbors exist.
Expand All @@ -16,6 +17,7 @@ fn test_search_by_id_singleton_fallback() {
id,
mem_vec,
Box::new(ArrayLookupTable::new(&span_fixture())),
new_noop_network(),
);

// Left and right searches for identifiers 5 and 15
Expand Down
Loading
Loading