From 00b6bbe9d7112e97b07937f9bda355e08736c8a4 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 16 Sep 2025 16:50:17 -0600 Subject: [PATCH 01/29] create forkchoice tree visualization function --- pkgs/node/src/tree_visualizer.zig | 106 ++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 pkgs/node/src/tree_visualizer.zig diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig new file mode 100644 index 00000000..f24189f6 --- /dev/null +++ b/pkgs/node/src/tree_visualizer.zig @@ -0,0 +1,106 @@ +const std = @import("std"); +const Allocator = std.mem.Allocator; +const fcFactory = @import("./forkchoice.zig"); + +/// Builds a tree visualization of the fork choice tree +pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.ProtoNode) ![]const u8 { + var tree_lines = std.ArrayList(u8).init(allocator); + defer tree_lines.deinit(); + + // Find root nodes (nodes with no parent) + var root_indices = std.ArrayList(usize).init(allocator); + defer root_indices.deinit(); + + for (nodes, 0..) |node, i| { + if (node.parent == null) { + try root_indices.append(i); + } + } + + // Build tree visualization starting from roots + for (root_indices.items) |root_idx| { + try visualizeTreeBranch(allocator, &tree_lines, nodes, root_idx, 0, ""); + } + + return tree_lines.toOwnedSlice(); +} + +/// Recursively builds a tree branch visualization +fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nodes: []const fcFactory.ProtoNode, node_idx: usize, depth: usize, prefix: []const u8) !void { + const node = nodes[node_idx]; + const hex_root = std.fmt.allocPrint(allocator, "0x{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..4])}) catch "0x????"; + defer allocator.free(hex_root); + + const node_line = std.fmt.allocPrint(allocator, "{s}{s} ({d})", .{ prefix, hex_root, node.slot }) catch return; + defer allocator.free(node_line); + + try tree_lines.appendSlice(node_line); + + var children = std.ArrayList(usize).init(allocator); + defer children.deinit(); + + for (nodes, 0..) |child_node, i| { + if (child_node.parent) |parent_idx| { + if (parent_idx == node_idx) { + try children.append(i); + } + } + } + + if (children.items.len > 0) { + const child_count_comment = std.fmt.allocPrint(allocator, " // has {d} child branch{s}", .{ children.items.len, if (children.items.len == 1) "" else "es" }) catch return; + defer allocator.free(child_count_comment); + try tree_lines.appendSlice(child_count_comment); + } + try tree_lines.append('\n'); + + for (children.items, 0..) |child_idx, child_i| { + const child_node = nodes[child_idx]; + const is_last_child = child_i == children.items.len - 1; + + const indent = createTreeIndent(allocator, depth, is_last_child) catch return; + defer allocator.free(indent); + + // Check for missing slots between parent and child + if (child_node.slot > node.slot + 1) { + const missing_line = std.fmt.allocPrint(allocator, "{s}[slots {d}..{d}] ─┘ ", .{ indent, node.slot + 1, child_node.slot - 1 }) catch return; + defer allocator.free(missing_line); + try tree_lines.appendSlice(missing_line); + } else { + try tree_lines.appendSlice(indent); + } + + // Recursively process child + try visualizeTreeBranch(allocator, tree_lines, nodes, child_idx, depth + 1, ""); + } +} + +/// Helper function to create proper tree indentation +fn createTreeIndent(allocator: Allocator, depth: usize, is_last_child: bool) ![]const u8 { + var indent = std.ArrayList(u8).init(allocator); + defer indent.deinit(); + + // Add indentation for each depth level + for (0..depth) |_| { + try indent.appendSlice(" "); + } + + // Add tree characters based on position + const tree_char = if (is_last_child) "└── " else "├── "; + try indent.appendSlice(tree_char); + + return indent.toOwnedSlice(); +} + +/// Helper function to create vertical continuation lines +fn createVerticalLines(allocator: Allocator, depth: usize) ![]const u8 { + var lines = std.ArrayList(u8).init(allocator); + defer lines.deinit(); + + // Add vertical lines for each depth level + for (0..depth) |_| { + try lines.appendSlice(" │"); + } + + return lines.toOwnedSlice(); +} From 5ae1e8652df1da3b32c3776b36b454f2de93c9ca Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 16 Sep 2025 16:50:51 -0600 Subject: [PATCH 02/29] build tree visualization in printSlot function --- pkgs/node/src/chain.zig | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkgs/node/src/chain.zig b/pkgs/node/src/chain.zig index 007df6ad..6468d2dc 100644 --- a/pkgs/node/src/chain.zig +++ b/pkgs/node/src/chain.zig @@ -18,6 +18,7 @@ const jsonToString = zeam_utils.jsonToString; pub const fcFactory = @import("./forkchoice.zig"); const constants = @import("./constants.zig"); +const tree_visualizer = @import("./tree_visualizer.zig"); const node = @import("./node.zig"); const PeerInfo = node.PeerInfo; @@ -252,6 +253,11 @@ pub const BeamChain = struct { const blocks_behind = if (slot > fc_head.slot) slot - fc_head.slot else 0; const is_timely = fc_head.timeliness; + // Build tree visualization + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + const tree_visual = tree_visualizer.buildTreeVisualization(arena.allocator(), self.forkChoice.protoArray.nodes.items) catch "Tree visualization failed"; + self.module_logger.info( \\ \\+===============================================================+ From 43c6ec99ff45a39699aef7ab1c2356454698658e Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 16 Sep 2025 16:51:05 -0600 Subject: [PATCH 03/29] log forkchoice tree visual during printSlot --- pkgs/node/src/chain.zig | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkgs/node/src/chain.zig b/pkgs/node/src/chain.zig index 6468d2dc..270e2dfc 100644 --- a/pkgs/node/src/chain.zig +++ b/pkgs/node/src/chain.zig @@ -272,6 +272,9 @@ pub const BeamChain = struct { \\+---------------------------------------------------------------+ \\ Latest Justified: Slot {d:>6} | Root: 0x{any} \\ Latest Finalized: Slot {d:>6} | Root: 0x{any} + \\+---------------------------------------------------------------+ + \\ ForkChoice Tree: + \\{s} \\+===============================================================+ \\ , .{ @@ -287,6 +290,7 @@ pub const BeamChain = struct { std.fmt.fmtSliceHexLower(&justified.root), finalized.slot, std.fmt.fmtSliceHexLower(&finalized.root), + tree_visual, }); } From 7e17b92e579f2472d13000f187b93b1e95892d2e Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 16 Sep 2025 17:11:53 -0600 Subject: [PATCH 04/29] add configurable depth limit --- pkgs/node/src/tree_visualizer.zig | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index f24189f6..0216a8c7 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -2,8 +2,8 @@ const std = @import("std"); const Allocator = std.mem.Allocator; const fcFactory = @import("./forkchoice.zig"); -/// Builds a tree visualization of the fork choice tree -pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.ProtoNode) ![]const u8 { +/// Builds a tree visualization of the fork choice tree with optional depth limit +pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.ProtoNode, max_depth: ?usize) ![]const u8 { var tree_lines = std.ArrayList(u8).init(allocator); defer tree_lines.deinit(); @@ -19,14 +19,14 @@ pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.Pro // Build tree visualization starting from roots for (root_indices.items) |root_idx| { - try visualizeTreeBranch(allocator, &tree_lines, nodes, root_idx, 0, ""); + try visualizeTreeBranch(allocator, &tree_lines, nodes, root_idx, 0, "", max_depth); } return tree_lines.toOwnedSlice(); } /// Recursively builds a tree branch visualization -fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nodes: []const fcFactory.ProtoNode, node_idx: usize, depth: usize, prefix: []const u8) !void { +fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nodes: []const fcFactory.ProtoNode, node_idx: usize, depth: usize, prefix: []const u8, max_depth: ?usize) !void { const node = nodes[node_idx]; const hex_root = std.fmt.allocPrint(allocator, "0x{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..4])}) catch "0x????"; defer allocator.free(hex_root); @@ -36,6 +36,17 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod try tree_lines.appendSlice(node_line); + // Check if we've reached the maximum depth + if (max_depth) |max| { + if (depth >= max) { + const truncated_comment = std.fmt.allocPrint(allocator, " // ... (truncated at depth {d})", .{max}) catch return; + defer allocator.free(truncated_comment); + try tree_lines.appendSlice(truncated_comment); + try tree_lines.append('\n'); + return; + } + } + var children = std.ArrayList(usize).init(allocator); defer children.deinit(); @@ -71,7 +82,7 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod } // Recursively process child - try visualizeTreeBranch(allocator, tree_lines, nodes, child_idx, depth + 1, ""); + try visualizeTreeBranch(allocator, tree_lines, nodes, child_idx, depth + 1, "", max_depth); } } From 3f795781f33782c59f7183f4ff7c94b0002a5a93 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 16 Sep 2025 17:12:17 -0600 Subject: [PATCH 05/29] add tree_depth parameter to printSlot --- pkgs/node/src/chain.zig | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkgs/node/src/chain.zig b/pkgs/node/src/chain.zig index 270e2dfc..f2d75906 100644 --- a/pkgs/node/src/chain.zig +++ b/pkgs/node/src/chain.zig @@ -154,7 +154,7 @@ pub const BeamChain = struct { // interval to vote so we should put out the chain status information to the user along with // latest head which most likely should be the new block received and processed const islot: isize = @intCast(slot); - self.printSlot(islot, self.connected_peers.count()); + self.printSlot(islot, null, self.connected_peers.count()); } // check if log rotation is needed self.zeam_logger_config.maybeRotate() catch |err| { @@ -232,7 +232,7 @@ pub const BeamChain = struct { return vote; } - pub fn printSlot(self: *Self, islot: isize, peer_count: usize) void { + pub fn printSlot(self: *Self, islot: isize, tree_depth: ?usize, peer_count: usize) void { // head should be auto updated if receieved a block or block proposal done // however it doesn't get updated unless called updatehead even though process block // logs show it has been updated. debug and fix the call below @@ -256,7 +256,7 @@ pub const BeamChain = struct { // Build tree visualization var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); - const tree_visual = tree_visualizer.buildTreeVisualization(arena.allocator(), self.forkChoice.protoArray.nodes.items) catch "Tree visualization failed"; + const tree_visual = tree_visualizer.buildTreeVisualization(arena.allocator(), self.forkChoice.protoArray.nodes.items, tree_depth) catch "Tree visualization failed"; self.module_logger.info( \\ @@ -603,16 +603,16 @@ test "printSlot output demonstration" { // Test printSlot at different slots to see the output std.debug.print("\n=== PRINTING CHAIN STATUS AT SLOT 0 ===\n", .{}); - beam_chain.printSlot(0, beam_chain.connected_peers.count()); + beam_chain.printSlot(0, null, beam_chain.connected_peers.count()); std.debug.print("\n=== PRINTING CHAIN STATUS AT SLOT 1 ===\n", .{}); - beam_chain.printSlot(1, beam_chain.connected_peers.count()); + beam_chain.printSlot(1, null, beam_chain.connected_peers.count()); std.debug.print("\n=== PRINTING CHAIN STATUS AT SLOT 2 ===\n", .{}); - beam_chain.printSlot(2, beam_chain.connected_peers.count()); + beam_chain.printSlot(2, null, beam_chain.connected_peers.count()); std.debug.print("\n=== PRINTING CHAIN STATUS AT SLOT 5 (BEHIND) ===\n", .{}); - beam_chain.printSlot(5, beam_chain.connected_peers.count()); + beam_chain.printSlot(5, null, beam_chain.connected_peers.count()); // Verify that the chain state is as expected try std.testing.expect(beam_chain.forkChoice.protoArray.nodes.items.len == mock_chain.blocks.len); From 37a1ff6efe2066908e06414fa65d70738af30b86 Mon Sep 17 00:00:00 2001 From: Scotty <66335769+ScottyPoi@users.noreply.github.com> Date: Tue, 16 Sep 2025 17:42:23 -0600 Subject: [PATCH 06/29] delete unused helper Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pkgs/node/src/tree_visualizer.zig | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index 0216a8c7..6dd90c38 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -103,15 +103,3 @@ fn createTreeIndent(allocator: Allocator, depth: usize, is_last_child: bool) ![] return indent.toOwnedSlice(); } -/// Helper function to create vertical continuation lines -fn createVerticalLines(allocator: Allocator, depth: usize) ![]const u8 { - var lines = std.ArrayList(u8).init(allocator); - defer lines.deinit(); - - // Add vertical lines for each depth level - for (0..depth) |_| { - try lines.appendSlice(" │"); - } - - return lines.toOwnedSlice(); -} From aad8a6760976df09550359975508ba095f3379f5 Mon Sep 17 00:00:00 2001 From: Scotty <66335769+ScottyPoi@users.noreply.github.com> Date: Tue, 16 Sep 2025 17:43:02 -0600 Subject: [PATCH 07/29] Update pkgs/node/src/tree_visualizer.zig Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- pkgs/node/src/tree_visualizer.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index 6dd90c38..dbb49bc7 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -28,7 +28,7 @@ pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.Pro /// Recursively builds a tree branch visualization fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nodes: []const fcFactory.ProtoNode, node_idx: usize, depth: usize, prefix: []const u8, max_depth: ?usize) !void { const node = nodes[node_idx]; - const hex_root = std.fmt.allocPrint(allocator, "0x{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..4])}) catch "0x????"; + const hex_root = std.fmt.allocPrint(allocator, "0x{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..4])}) catch return; defer allocator.free(hex_root); const node_line = std.fmt.allocPrint(allocator, "{s}{s} ({d})", .{ prefix, hex_root, node.slot }) catch return; From 7a3ba81c071ad9510ab474885ec4f245c50bee82 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 16 Sep 2025 17:45:49 -0600 Subject: [PATCH 08/29] replace catch returns with try --- pkgs/node/src/tree_visualizer.zig | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index dbb49bc7..2242718f 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -28,10 +28,10 @@ pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.Pro /// Recursively builds a tree branch visualization fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nodes: []const fcFactory.ProtoNode, node_idx: usize, depth: usize, prefix: []const u8, max_depth: ?usize) !void { const node = nodes[node_idx]; - const hex_root = std.fmt.allocPrint(allocator, "0x{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..4])}) catch return; + const hex_root = try std.fmt.allocPrint(allocator, "0x{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..4])}); defer allocator.free(hex_root); - const node_line = std.fmt.allocPrint(allocator, "{s}{s} ({d})", .{ prefix, hex_root, node.slot }) catch return; + const node_line = try std.fmt.allocPrint(allocator, "{s}{s} ({d})", .{ prefix, hex_root, node.slot }); defer allocator.free(node_line); try tree_lines.appendSlice(node_line); @@ -39,7 +39,7 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod // Check if we've reached the maximum depth if (max_depth) |max| { if (depth >= max) { - const truncated_comment = std.fmt.allocPrint(allocator, " // ... (truncated at depth {d})", .{max}) catch return; + const truncated_comment = try std.fmt.allocPrint(allocator, " // ... (truncated at depth {d})", .{max}); defer allocator.free(truncated_comment); try tree_lines.appendSlice(truncated_comment); try tree_lines.append('\n'); @@ -59,7 +59,7 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod } if (children.items.len > 0) { - const child_count_comment = std.fmt.allocPrint(allocator, " // has {d} child branch{s}", .{ children.items.len, if (children.items.len == 1) "" else "es" }) catch return; + const child_count_comment = try std.fmt.allocPrint(allocator, " // has {d} child branch{s}", .{ children.items.len, if (children.items.len == 1) "" else "es" }); defer allocator.free(child_count_comment); try tree_lines.appendSlice(child_count_comment); } @@ -69,12 +69,12 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod const child_node = nodes[child_idx]; const is_last_child = child_i == children.items.len - 1; - const indent = createTreeIndent(allocator, depth, is_last_child) catch return; + const indent = try createTreeIndent(allocator, depth, is_last_child); defer allocator.free(indent); // Check for missing slots between parent and child if (child_node.slot > node.slot + 1) { - const missing_line = std.fmt.allocPrint(allocator, "{s}[slots {d}..{d}] ─┘ ", .{ indent, node.slot + 1, child_node.slot - 1 }) catch return; + const missing_line = try std.fmt.allocPrint(allocator, "{s}[slots {d}..{d}] ─┘ ", .{ indent, node.slot + 1, child_node.slot - 1 }); defer allocator.free(missing_line); try tree_lines.appendSlice(missing_line); } else { @@ -102,4 +102,3 @@ fn createTreeIndent(allocator: Allocator, depth: usize, is_last_child: bool) ![] return indent.toOwnedSlice(); } - From 6ce1ce771ed0a5be774faf7de6454b996881d817 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 23 Sep 2025 11:24:33 -0600 Subject: [PATCH 09/29] replace arrayList with ArrayListUnmanaged --- pkgs/node/src/tree_visualizer.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index 2242718f..76b27fe4 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -4,7 +4,7 @@ const fcFactory = @import("./forkchoice.zig"); /// Builds a tree visualization of the fork choice tree with optional depth limit pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.ProtoNode, max_depth: ?usize) ![]const u8 { - var tree_lines = std.ArrayList(u8).init(allocator); + var tree_lines = std.ArrayListUnmanaged(u8).init(allocator); defer tree_lines.deinit(); // Find root nodes (nodes with no parent) From eb12779fa5e0c69ea39fb8b020e750ecb83ea0ee Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 23 Sep 2025 11:31:05 -0600 Subject: [PATCH 10/29] Remove // comments from visualizer logs --- pkgs/node/src/tree_visualizer.zig | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index 76b27fe4..df08c331 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -39,7 +39,7 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod // Check if we've reached the maximum depth if (max_depth) |max| { if (depth >= max) { - const truncated_comment = try std.fmt.allocPrint(allocator, " // ... (truncated at depth {d})", .{max}); + const truncated_comment = try std.fmt.allocPrint(allocator, " ... (truncated at depth {d})", .{max}); defer allocator.free(truncated_comment); try tree_lines.appendSlice(truncated_comment); try tree_lines.append('\n'); @@ -59,7 +59,7 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod } if (children.items.len > 0) { - const child_count_comment = try std.fmt.allocPrint(allocator, " // has {d} child branch{s}", .{ children.items.len, if (children.items.len == 1) "" else "es" }); + const child_count_comment = try std.fmt.allocPrint(allocator, " has {d} child branch{s}", .{ children.items.len, if (children.items.len == 1) "" else "es" }); defer allocator.free(child_count_comment); try tree_lines.appendSlice(child_count_comment); } From a2847dc2050c0ea3e48823155f4c937f9fd725d0 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 23 Sep 2025 11:32:36 -0600 Subject: [PATCH 11/29] Simplify slot log --- pkgs/node/src/tree_visualizer.zig | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index df08c331..8cf273b4 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -74,7 +74,11 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod // Check for missing slots between parent and child if (child_node.slot > node.slot + 1) { - const missing_line = try std.fmt.allocPrint(allocator, "{s}[slots {d}..{d}] ─┘ ", .{ indent, node.slot + 1, child_node.slot - 1 }); + const missing_slots = child_node.slot - node.slot - 1; + const missing_line = if (missing_slots == 1) + try std.fmt.allocPrint(allocator, "{s}[{d}] ─┘ ", .{ indent, node.slot + 1 }) + else + try std.fmt.allocPrint(allocator, "{s}[{d}..{d}] ─┘ ", .{ indent, node.slot + 1, child_node.slot - 1 }); defer allocator.free(missing_line); try tree_lines.appendSlice(missing_line); } else { From 49b6f358abd746d741a5e6d16fdad0c9ff281a3c Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 23 Sep 2025 11:34:14 -0600 Subject: [PATCH 12/29] Remove spacing before brackets --- pkgs/node/src/tree_visualizer.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index 8cf273b4..a6b26661 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -31,7 +31,7 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod const hex_root = try std.fmt.allocPrint(allocator, "0x{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..4])}); defer allocator.free(hex_root); - const node_line = try std.fmt.allocPrint(allocator, "{s}{s} ({d})", .{ prefix, hex_root, node.slot }); + const node_line = try std.fmt.allocPrint(allocator, "{s}{s}({d})", .{ prefix, hex_root, node.slot }); defer allocator.free(node_line); try tree_lines.appendSlice(node_line); From 75caca7a6cce5bf7b96f705d2b705aaccbdfc6e1 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 23 Sep 2025 11:34:41 -0600 Subject: [PATCH 13/29] Remove 0x prefix --- pkgs/node/src/tree_visualizer.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index a6b26661..067eb5db 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -28,7 +28,7 @@ pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.Pro /// Recursively builds a tree branch visualization fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nodes: []const fcFactory.ProtoNode, node_idx: usize, depth: usize, prefix: []const u8, max_depth: ?usize) !void { const node = nodes[node_idx]; - const hex_root = try std.fmt.allocPrint(allocator, "0x{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..4])}); + const hex_root = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..4])}); defer allocator.free(hex_root); const node_line = try std.fmt.allocPrint(allocator, "{s}{s}({d})", .{ prefix, hex_root, node.slot }); From a7a7102bd430366df9e0f8c9e95d72c931e9f804 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 23 Sep 2025 11:35:09 -0600 Subject: [PATCH 14/29] Use two digits for root --- pkgs/node/src/tree_visualizer.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index 067eb5db..9d99a3ee 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -28,7 +28,7 @@ pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.Pro /// Recursively builds a tree branch visualization fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nodes: []const fcFactory.ProtoNode, node_idx: usize, depth: usize, prefix: []const u8, max_depth: ?usize) !void { const node = nodes[node_idx]; - const hex_root = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..4])}); + const hex_root = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..2])}); defer allocator.free(hex_root); const node_line = try std.fmt.allocPrint(allocator, "{s}{s}({d})", .{ prefix, hex_root, node.slot }); From 628f5fe18b1f86fe08c357ddbffa4ac3956557b0 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Tue, 30 Sep 2025 14:08:32 -0600 Subject: [PATCH 15/29] fix ArrayListUnmanaged usage --- pkgs/node/src/tree_visualizer.zig | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig index 9d99a3ee..e02e6893 100644 --- a/pkgs/node/src/tree_visualizer.zig +++ b/pkgs/node/src/tree_visualizer.zig @@ -4,8 +4,8 @@ const fcFactory = @import("./forkchoice.zig"); /// Builds a tree visualization of the fork choice tree with optional depth limit pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.ProtoNode, max_depth: ?usize) ![]const u8 { - var tree_lines = std.ArrayListUnmanaged(u8).init(allocator); - defer tree_lines.deinit(); + var tree_lines = std.ArrayListUnmanaged(u8){}; + defer tree_lines.deinit(allocator); // Find root nodes (nodes with no parent) var root_indices = std.ArrayList(usize).init(allocator); @@ -22,11 +22,11 @@ pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.Pro try visualizeTreeBranch(allocator, &tree_lines, nodes, root_idx, 0, "", max_depth); } - return tree_lines.toOwnedSlice(); + return tree_lines.toOwnedSlice(allocator); } /// Recursively builds a tree branch visualization -fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nodes: []const fcFactory.ProtoNode, node_idx: usize, depth: usize, prefix: []const u8, max_depth: ?usize) !void { +fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayListUnmanaged(u8), nodes: []const fcFactory.ProtoNode, node_idx: usize, depth: usize, prefix: []const u8, max_depth: ?usize) !void { const node = nodes[node_idx]; const hex_root = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..2])}); defer allocator.free(hex_root); @@ -34,15 +34,15 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod const node_line = try std.fmt.allocPrint(allocator, "{s}{s}({d})", .{ prefix, hex_root, node.slot }); defer allocator.free(node_line); - try tree_lines.appendSlice(node_line); + try tree_lines.appendSlice(allocator, node_line); // Check if we've reached the maximum depth if (max_depth) |max| { if (depth >= max) { const truncated_comment = try std.fmt.allocPrint(allocator, " ... (truncated at depth {d})", .{max}); defer allocator.free(truncated_comment); - try tree_lines.appendSlice(truncated_comment); - try tree_lines.append('\n'); + try tree_lines.appendSlice(allocator, truncated_comment); + try tree_lines.append(allocator, '\n'); return; } } @@ -61,9 +61,9 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod if (children.items.len > 0) { const child_count_comment = try std.fmt.allocPrint(allocator, " has {d} child branch{s}", .{ children.items.len, if (children.items.len == 1) "" else "es" }); defer allocator.free(child_count_comment); - try tree_lines.appendSlice(child_count_comment); + try tree_lines.appendSlice(allocator, child_count_comment); } - try tree_lines.append('\n'); + try tree_lines.append(allocator, '\n'); for (children.items, 0..) |child_idx, child_i| { const child_node = nodes[child_idx]; @@ -80,9 +80,9 @@ fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayList(u8), nod else try std.fmt.allocPrint(allocator, "{s}[{d}..{d}] ─┘ ", .{ indent, node.slot + 1, child_node.slot - 1 }); defer allocator.free(missing_line); - try tree_lines.appendSlice(missing_line); + try tree_lines.appendSlice(allocator, missing_line); } else { - try tree_lines.appendSlice(indent); + try tree_lines.appendSlice(allocator, indent); } // Recursively process child From 74510319705e1964b6489347a33cb2d83cd02184 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 3 Oct 2025 19:07:37 -0600 Subject: [PATCH 16/29] fix build error --- pkgs/node/src/node.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/node/src/node.zig b/pkgs/node/src/node.zig index 781508db..55c315d6 100644 --- a/pkgs/node/src/node.zig +++ b/pkgs/node/src/node.zig @@ -166,7 +166,7 @@ pub const BeamNode = struct { const interval = @mod(itime_intervals, constants.INTERVALS_PER_SLOT); if (interval == 1) { - self.chain.printSlot(islot, self.connected_peers.count()); + self.chain.printSlot(islot, null, self.connected_peers.count()); } return; } From eeb779195657b91b787043ab1b5c044d18cb56c8 Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Wed, 10 Dec 2025 21:42:42 +0530 Subject: [PATCH 17/29] feature: add node graph visualization for forkchoice --- pkgs/cli/src/api_server.zig | 190 ++++++++++++++++++++++++++++++++++++ pkgs/cli/src/main.zig | 3 + pkgs/cli/src/node.zig | 5 + pkgs/node/src/lib.zig | 3 + 4 files changed, 201 insertions(+) diff --git a/pkgs/cli/src/api_server.zig b/pkgs/cli/src/api_server.zig index 792128a1..3311015c 100644 --- a/pkgs/cli/src/api_server.zig +++ b/pkgs/cli/src/api_server.zig @@ -2,6 +2,25 @@ const std = @import("std"); const api = @import("@zeam/api"); const constants = @import("constants.zig"); const event_broadcaster = api.event_broadcaster; +const node = @import("@zeam/node"); + +// Global chain reference for API access +var global_chain: ?*node.BeamChain = null; +var chain_mutex = std.Thread.Mutex{}; + +/// Register the chain for API access +pub fn registerChain(chain: *node.BeamChain) void { + chain_mutex.lock(); + defer chain_mutex.unlock(); + global_chain = chain; +} + +/// Get the global chain reference +fn getChain() ?*node.BeamChain { + chain_mutex.lock(); + defer chain_mutex.unlock(); + return global_chain; +} /// Simple metrics server that runs in a background thread pub fn startAPIServer(allocator: std.mem.Allocator, port: u16) !void { @@ -63,11 +82,182 @@ fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Al .{ .name = "content-type", .value = "application/json; charset=utf-8" }, }, }) catch {}; + } else if (std.mem.startsWith(u8, request.head.target, "/api/forkchoice/graph")) { + // Handle fork choice graph request + handleForkChoiceGraph(&request, allocator) catch |err| { + std.log.warn("Fork choice graph request failed: {}", .{err}); + _ = request.respond("Internal Server Error\n", .{}) catch {}; + }; } else { _ = request.respond("Not Found\n", .{ .status = .not_found }) catch {}; } } +/// Handle fork choice graph API request +fn handleForkChoiceGraph(request: *std.http.Server.Request, allocator: std.mem.Allocator) !void { + // Get chain reference + const chain = getChain() orelse { + const error_response = "{\"error\":\"Chain not initialized\"}"; + _ = request.respond(error_response, .{ + .extra_headers = &.{ + .{ .name = "content-type", .value = "application/json; charset=utf-8" }, + }, + }) catch {}; + return; + }; + + // Parse query parameters for max_slots (default: 50) + var max_slots: usize = 50; + if (std.mem.indexOf(u8, request.head.target, "?slots=")) |query_start| { + const slots_param = request.head.target[query_start + 7 ..]; + if (std.mem.indexOf(u8, slots_param, "&")) |end| { + max_slots = std.fmt.parseInt(usize, slots_param[0..end], 10) catch 50; + } else { + max_slots = std.fmt.parseInt(usize, slots_param, 10) catch 50; + } + } + + // Build the graph data + var graph_json = std.ArrayList(u8).init(allocator); + defer graph_json.deinit(); + + try buildGraphJSON(chain, graph_json.writer(), max_slots, allocator); + + // Send response + _ = request.respond(graph_json.items, .{ + .extra_headers = &.{ + .{ .name = "content-type", .value = "application/json; charset=utf-8" }, + .{ .name = "access-control-allow-origin", .value = "*" }, + }, + }) catch {}; +} + +/// Build fork choice graph in Grafana node-graph JSON format +fn buildGraphJSON( + chain: *node.BeamChain, + writer: anytype, + max_slots: usize, + allocator: std.mem.Allocator, +) !void { + const fork_choice = &chain.forkChoice; + const proto_nodes = fork_choice.protoArray.nodes.items; + + // Determine the slot threshold (show only recent slots) + const current_slot = fork_choice.head.slot; + const min_slot = if (current_slot > max_slots) current_slot - max_slots else 0; + + // Build nodes and edges + var nodes_list = std.ArrayList(u8).init(allocator); + defer nodes_list.deinit(); + var edges_list = std.ArrayList(u8).init(allocator); + defer edges_list.deinit(); + + var node_count: usize = 0; + var edge_count: usize = 0; + + // Find max weight for normalization + var max_weight: isize = 1; + for (proto_nodes) |pnode| { + if (pnode.slot >= min_slot and pnode.weight > max_weight) { + max_weight = pnode.weight; + } + } + + // Build nodes + for (proto_nodes, 0..) |pnode, idx| { + if (pnode.slot < min_slot) continue; + + // Determine node role and color + const is_head = std.mem.eql(u8, &pnode.blockRoot, &fork_choice.head.blockRoot); + const is_justified = std.mem.eql(u8, &pnode.blockRoot, &fork_choice.fcStore.latest_justified.root); + const is_finalized = std.mem.eql(u8, &pnode.blockRoot, &fork_choice.fcStore.latest_finalized.root); + + const role = if (is_finalized) + "finalized" + else if (is_justified) + "justified" + else if (is_head) + "head" + else + "normal"; + + // Direct HTML color strings - no threshold mapping needed! + const color_str = if (is_finalized) + "#9B59B6" // Finalized - purple + else if (is_justified) + "#3498DB" // Justified - blue + else if (is_head) + "#F39C12" // Head - gold/orange + else if (pnode.timeliness) + "#2ECC71" // Timely - green + else + "#FFA500"; // Non-timely - orange + + // Normalized weight for arc + const arc_weight: f64 = if (max_weight > 0) + @as(f64, @floatFromInt(pnode.weight)) / @as(f64, @floatFromInt(max_weight)) + else + 0.0; + + // Block root as hex + const hex_prefix = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(pnode.blockRoot[0..4])}); + defer allocator.free(hex_prefix); + const full_root = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(&pnode.blockRoot)}); + defer allocator.free(full_root); + + if (node_count > 0) { + try nodes_list.appendSlice(","); + } + + try std.fmt.format(nodes_list.writer(), + \\{{"id":"{s}","title":"Slot {d}","mainStat":"{d}","secondaryStat":"{d}","arc__weight":{d:.2},"color":"{s}","detail__role":"{s}","detail__timely":{},"detail__hex_prefix":"{s}"}} + , .{ + full_root, + pnode.slot, + pnode.weight, + pnode.slot, + arc_weight, + color_str, + role, + pnode.timeliness, + hex_prefix, + }); + + node_count += 1; + + // Build edges (parent -> child relationships) + if (pnode.parent) |parent_idx| { + const parent_node = proto_nodes[parent_idx]; + if (parent_node.slot >= min_slot) { + const parent_root = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(&parent_node.blockRoot)}); + defer allocator.free(parent_root); + + const is_best_child = if (parent_node.bestChild) |bc| bc == idx else false; + + if (edge_count > 0) { + try edges_list.appendSlice(","); + } + + try std.fmt.format(edges_list.writer(), + \\{{"id":"edge_{d}","source":"{s}","target":"{s}","mainStat":"","detail__is_best_child":{}}} + , .{ + edge_count, + parent_root, + full_root, + is_best_child, + }); + + edge_count += 1; + } + } + } + + // Write final JSON + try std.fmt.format(writer, + \\{{"nodes":[{s}],"edges":[{s}]}} + , .{ nodes_list.items, edges_list.items }); +} + /// Simple metrics server context const SimpleMetricsServer = struct { allocator: std.mem.Allocator, diff --git a/pkgs/cli/src/main.zig b/pkgs/cli/src/main.zig index 4194f654..dd16da1b 100644 --- a/pkgs/cli/src/main.zig +++ b/pkgs/cli/src/main.zig @@ -450,6 +450,9 @@ fn mainInner() !void { .logger_config = &logger1_config, }); + // Register beam_node_1's chain for fork choice graph API + api_server.registerChain(beam_node_1.chain); + var beam_node_2: BeamNode = undefined; try beam_node_2.init(allocator, .{ // options diff --git a/pkgs/cli/src/node.zig b/pkgs/cli/src/node.zig index 1c2bf9f6..2edaba48 100644 --- a/pkgs/cli/src/node.zig +++ b/pkgs/cli/src/node.zig @@ -171,6 +171,11 @@ pub const Node = struct { .logger_config = options.logger_config, }); + // Register the chain with the API server for fork choice graph endpoint + if (options.metrics_enable) { + api_server.registerChain(self.beam_node.chain); + } + self.logger = options.logger_config.logger(.node); } diff --git a/pkgs/node/src/lib.zig b/pkgs/node/src/lib.zig index 67a01a75..8cda0b2f 100644 --- a/pkgs/node/src/lib.zig +++ b/pkgs/node/src/lib.zig @@ -4,6 +4,9 @@ pub const Clock = clockFactory.Clock; const nodeFactory = @import("./node.zig"); pub const BeamNode = nodeFactory.BeamNode; +const chainFactory = @import("./chain.zig"); +pub const BeamChain = chainFactory.BeamChain; + pub const fcFactory = @import("./forkchoice.zig"); pub const constants = @import("./constants.zig"); From 47b8c7ec0f48ea29dcbedec8433b1cd6b47ab61b Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Fri, 12 Dec 2025 19:27:06 +0530 Subject: [PATCH 18/29] feat: add validator weight based arcs --- pkgs/cli/src/api_server.zig | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/pkgs/cli/src/api_server.zig b/pkgs/cli/src/api_server.zig index 3311015c..5841ae24 100644 --- a/pkgs/cli/src/api_server.zig +++ b/pkgs/cli/src/api_server.zig @@ -181,24 +181,22 @@ fn buildGraphJSON( else "normal"; - // Direct HTML color strings - no threshold mapping needed! - const color_str = if (is_finalized) - "#9B59B6" // Finalized - purple - else if (is_justified) - "#3498DB" // Justified - blue - else if (is_head) - "#F39C12" // Head - gold/orange - else if (pnode.timeliness) - "#2ECC71" // Timely - green - else - "#FFA500"; // Non-timely - orange - - // Normalized weight for arc + // Normalized weight for arc (0.0 to 1.0, draws partial circle border) + // Represents fraction of circle filled (0.5 = half circle, 1.0 = full circle) const arc_weight: f64 = if (max_weight > 0) @as(f64, @floatFromInt(pnode.weight)) / @as(f64, @floatFromInt(max_weight)) else 0.0; + // Use separate arc fields for each color (only one is set per node, others are 0) + // This allows manual arc section configuration with explicit colors + // TODO: Use chain.forkChoice.isBlockTimely(blockDelayMs) once implemented + // For now, treat all non-finalized/non-justified/non-head blocks as timely + const arc_timely: f64 = if (!is_finalized and !is_justified and !is_head) arc_weight else 0.0; + const arc_head: f64 = if (is_head) arc_weight else 0.0; + const arc_justified: f64 = if (is_justified) arc_weight else 0.0; + const arc_finalized: f64 = if (is_finalized) arc_weight else 0.0; + // Block root as hex const hex_prefix = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(pnode.blockRoot[0..4])}); defer allocator.free(hex_prefix); @@ -210,16 +208,17 @@ fn buildGraphJSON( } try std.fmt.format(nodes_list.writer(), - \\{{"id":"{s}","title":"Slot {d}","mainStat":"{d}","secondaryStat":"{d}","arc__weight":{d:.2},"color":"{s}","detail__role":"{s}","detail__timely":{},"detail__hex_prefix":"{s}"}} + \\{{"id":"{s}","title":"Slot {d}","mainStat":"{d}","secondaryStat":"{d}","arc__timely":{d:.4},"arc__head":{d:.4},"arc__justified":{d:.4},"arc__finalized":{d:.4},"detail__role":"{s}","detail__hex_prefix":"{s}"}} , .{ full_root, pnode.slot, pnode.weight, pnode.slot, - arc_weight, - color_str, + arc_timely, + arc_head, + arc_justified, + arc_finalized, role, - pnode.timeliness, hex_prefix, }); From 6e7d1435c573ba1e74e81173dd448b5fe112260e Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Sat, 13 Dec 2025 08:40:05 +0530 Subject: [PATCH 19/29] refactor: make forkchoice thread-safe and optimize graph api --- pkgs/cli/src/api_server.zig | 43 ++++++- pkgs/node/src/chain.zig | 66 +++++----- pkgs/node/src/forkchoice.zig | 238 +++++++++++++++++++++++++++++++---- 3 files changed, 281 insertions(+), 66 deletions(-) diff --git a/pkgs/cli/src/api_server.zig b/pkgs/cli/src/api_server.zig index 5841ae24..e8a5a384 100644 --- a/pkgs/cli/src/api_server.zig +++ b/pkgs/cli/src/api_server.zig @@ -139,11 +139,14 @@ fn buildGraphJSON( max_slots: usize, allocator: std.mem.Allocator, ) !void { - const fork_choice = &chain.forkChoice; - const proto_nodes = fork_choice.protoArray.nodes.items; + // Thread-safe snapshot - lock held only during copy + const snapshot = try chain.forkChoice.snapshot(allocator); + defer snapshot.deinit(allocator); + + const proto_nodes = snapshot.nodes; // Determine the slot threshold (show only recent slots) - const current_slot = fork_choice.head.slot; + const current_slot = snapshot.head.slot; const min_slot = if (current_slot > max_slots) current_slot - max_slots else 0; // Build nodes and edges @@ -164,13 +167,41 @@ fn buildGraphJSON( } // Build nodes + // Find the finalized node index to check ancestry + const finalized_idx = blk: { + for (proto_nodes, 0..) |n, i| { + if (std.mem.eql(u8, &n.blockRoot, &snapshot.latest_finalized_root)) { + break :blk i; + } + } + break :blk null; + }; + for (proto_nodes, 0..) |pnode, idx| { if (pnode.slot < min_slot) continue; // Determine node role and color - const is_head = std.mem.eql(u8, &pnode.blockRoot, &fork_choice.head.blockRoot); - const is_justified = std.mem.eql(u8, &pnode.blockRoot, &fork_choice.fcStore.latest_justified.root); - const is_finalized = std.mem.eql(u8, &pnode.blockRoot, &fork_choice.fcStore.latest_finalized.root); + const is_head = std.mem.eql(u8, &pnode.blockRoot, &snapshot.head.blockRoot); + const is_justified = std.mem.eql(u8, &pnode.blockRoot, &snapshot.latest_justified_root); + + // A block is finalized if: + // 1. It equals the finalized checkpoint, OR + // 2. The finalized block is a descendant of it (block is ancestor of finalized) + const is_finalized = blk: { + // Check if this block IS the finalized block + if (std.mem.eql(u8, &pnode.blockRoot, &snapshot.latest_finalized_root)) { + break :blk true; + } + // Check if this block is an ancestor of the finalized block + if (finalized_idx) |fin_idx| { + var current_idx: ?usize = fin_idx; + while (current_idx) |curr| { + if (curr == idx) break :blk true; + current_idx = proto_nodes[curr].parent; + } + } + break :blk false; + }; const role = if (is_finalized) "finalized" diff --git a/pkgs/node/src/chain.zig b/pkgs/node/src/chain.zig index 77f64deb..5c9dfeed 100644 --- a/pkgs/node/src/chain.zig +++ b/pkgs/node/src/chain.zig @@ -249,7 +249,7 @@ pub const BeamChain = struct { const slot = opts.slot; // const head = try self.forkChoice.getProposalHead(slot); - const head_proto = self.forkChoice.head; + const head_proto = self.forkChoice.getHead(); const head: types.Checkpoint = .{ .root = head_proto.blockRoot, .slot = head_proto.slot, @@ -260,7 +260,7 @@ pub const BeamChain = struct { .slot = slot, .head = head, .target = target, - .source = self.forkChoice.fcStore.latest_justified, + .source = self.forkChoice.getLatestJustified(), }; return attestation_data; @@ -276,21 +276,27 @@ pub const BeamChain = struct { return; } else - self.forkChoice.head; + self.forkChoice.getHead(); // Get additional chain information - const justified = self.forkChoice.fcStore.latest_justified; - const finalized = self.forkChoice.fcStore.latest_finalized; + const justified = self.forkChoice.getLatestJustified(); + const finalized = self.forkChoice.getLatestFinalized(); // Calculate chain progress const slot: usize = if (islot < 0) 0 else @intCast(islot); const blocks_behind = if (slot > fc_head.slot) slot - fc_head.slot else 0; const is_timely = fc_head.timeliness; - // Build tree visualization + // Build tree visualization (thread-safe snapshot) var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); - const tree_visual = tree_visualizer.buildTreeVisualization(arena.allocator(), self.forkChoice.protoArray.nodes.items, tree_depth) catch "Tree visualization failed"; + const tree_visual = blk: { + const snapshot = self.forkChoice.snapshot(arena.allocator()) catch { + break :blk "Failed to get fork choice snapshot"; + }; + defer snapshot.deinit(arena.allocator()); + break :blk tree_visualizer.buildTreeVisualization(arena.allocator(), snapshot.nodes, tree_depth) catch "Tree visualization failed"; + }; self.module_logger.info( \\ @@ -490,9 +496,8 @@ pub const BeamChain = struct { self.module_logger.warn("Failed to create head event: {any}", .{err}); } - const store = self.forkChoice.fcStore; - const latest_justified = store.latest_justified; - const latest_finalized = store.latest_finalized; + const latest_justified = self.forkChoice.getLatestJustified(); + const latest_finalized = self.forkChoice.getLatestFinalized(); // 7. Save block and state to database self.updateBlockDb(signedBlock, fcBlock.blockRoot, post_state.*, block.slot, latest_finalized.slot) catch |err| { @@ -588,8 +593,8 @@ pub const BeamChain = struct { /// Process finalization advancement: move canonical blocks to finalized index and cleanup unfinalized indices fn processFinalizationAdvancement(self: *Self, batch: *database.Db.WriteBatch, previousFinalizedSlot: types.Slot, finalizedSlot: types.Slot) !void { - // 1. Fetch all newly finalized roots - const current_finalized = self.forkChoice.fcStore.latest_finalized.root; + // 1. Fetch all newly finalized roots (thread-safe) + const current_finalized = self.forkChoice.getLatestFinalized().root; const newly_finalized_roots = try self.forkChoice.getAncestorsOfFinalized(self.allocator, current_finalized, previousFinalizedSlot); defer self.allocator.free(newly_finalized_roots); @@ -606,14 +611,13 @@ pub const BeamChain = struct { canonical_blocks.put(root, {}) catch {}; } - // 2. Put all newly finalized roots in DbFinalizedSlotsNamespace + // 2. Put all newly finalized roots in DbFinalizedSlotsNamespace (thread-safe) for (newly_finalized_roots) |root| { - const idx = self.forkChoice.protoArray.indices.get(root) orelse return error.FinalizedBlockNotInForkChoice; - const node = self.forkChoice.protoArray.nodes.items[idx]; - batch.putFinalizedSlotIndex(database.DbFinalizedSlotsNamespace, node.slot, root); + const slot = self.forkChoice.getBlockSlot(root) orelse return error.FinalizedBlockNotInForkChoice; + batch.putFinalizedSlotIndex(database.DbFinalizedSlotsNamespace, slot, root); self.module_logger.debug("Added block 0x{s} at slot {d} to finalized index", .{ std.fmt.fmtSliceHexLower(&root), - node.slot, + slot, }); } @@ -676,31 +680,27 @@ pub const BeamChain = struct { pub fn validateAttestation(self: *Self, attestation: types.Attestation, is_from_block: bool) !void { const data = attestation.data; - // 1. Validate that source, target, and head blocks exist in proto array - const source_idx = self.forkChoice.protoArray.indices.get(data.source.root) orelse { + // 1. Validate that source, target, and head blocks exist in proto array (thread-safe) + const source_block = self.forkChoice.getProtoNode(data.source.root) orelse { self.module_logger.debug("Attestation validation failed: unknown source block root=0x{s}", .{ std.fmt.fmtSliceHexLower(&data.source.root), }); return AttestationValidationError.UnknownSourceBlock; }; - const target_idx = self.forkChoice.protoArray.indices.get(data.target.root) orelse { + const target_block = self.forkChoice.getProtoNode(data.target.root) orelse { self.module_logger.debug("Attestation validation failed: unknown target block root=0x{s}", .{ std.fmt.fmtSliceHexLower(&data.target.root), }); return AttestationValidationError.UnknownTargetBlock; }; - const head_idx = self.forkChoice.protoArray.indices.get(data.head.root) orelse { + const head_block = self.forkChoice.getProtoNode(data.head.root) orelse { self.module_logger.debug("Attestation validation failed: unknown head block root=0x{s}", .{ std.fmt.fmtSliceHexLower(&data.head.root), }); return AttestationValidationError.UnknownHeadBlock; }; - - const source_block = self.forkChoice.protoArray.nodes.items[source_idx]; - const target_block = self.forkChoice.protoArray.nodes.items[target_idx]; - const head_block = self.forkChoice.protoArray.nodes.items[head_idx]; _ = head_block; // Will be used in future validations // 2. Validate slot relationships @@ -744,7 +744,7 @@ pub const BeamChain = struct { // Gossip attestations must be for current or past slots only. Validators attest // in interval 1 of the current slot, so they cannot attest for future slots. // Block attestations can be more lenient since the block itself was validated. - const current_slot = self.forkChoice.fcStore.timeSlots; + const current_slot = self.forkChoice.getCurrentSlot(); const max_allowed_slot = if (is_from_block) current_slot + constants.MAX_FUTURE_SLOT_TOLERANCE // Block attestations: allow +1 else @@ -781,8 +781,8 @@ pub const BeamChain = struct { } pub fn getStatus(self: *Self) types.Status { - const finalized = self.forkChoice.fcStore.latest_finalized; - const head = self.forkChoice.head; + const finalized = self.forkChoice.getLatestFinalized(); + const head = self.forkChoice.getHead(); return .{ .finalized_root = finalized.root, @@ -847,9 +847,9 @@ test "process and add mock blocks into a node's chain" { var beam_chain = try BeamChain.init(allocator, ChainOpts{ .config = chain_config, .anchorState = &beam_state, .nodeId = nodeId, .logger_config = &zeam_logger_config, .db = db }, connected_peers); defer beam_chain.deinit(); - try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.fcStore.latest_finalized.root, &mock_chain.blockRoots[0])); + try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.getLatestFinalized().root, &mock_chain.blockRoots[0])); try std.testing.expect(beam_chain.forkChoice.protoArray.nodes.items.len == 1); - try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.fcStore.latest_finalized.root, &beam_chain.forkChoice.protoArray.nodes.items[0].blockRoot)); + try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.getLatestFinalized().root, &beam_chain.forkChoice.protoArray.nodes.items[0].blockRoot)); try std.testing.expect(std.mem.eql(u8, mock_chain.blocks[0].message.block.state_root[0..], &beam_chain.forkChoice.protoArray.nodes.items[0].stateRoot)); try std.testing.expect(std.mem.eql(u8, &mock_chain.blockRoots[0], &beam_chain.forkChoice.protoArray.nodes.items[0].blockRoot)); @@ -876,9 +876,9 @@ test "process and add mock blocks into a node's chain" { try std.testing.expect(std.mem.eql(u8, &state_root, &block.state_root)); // fcstore checkpoints should match - try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.fcStore.latest_justified.root, &mock_chain.latestJustified[i].root)); - try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.fcStore.latest_finalized.root, &mock_chain.latestFinalized[i].root)); - try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.head.blockRoot, &mock_chain.latestHead[i].root)); + try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.getLatestJustified().root, &mock_chain.latestJustified[i].root)); + try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.getLatestFinalized().root, &mock_chain.latestFinalized[i].root)); + try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.getHead().blockRoot, &mock_chain.latestHead[i].root)); } const num_validators: usize = @intCast(mock_chain.genesis_config.numValidators()); diff --git a/pkgs/node/src/forkchoice.zig b/pkgs/node/src/forkchoice.zig index 3ab74613..07acfb82 100644 --- a/pkgs/node/src/forkchoice.zig +++ b/pkgs/node/src/forkchoice.zig @@ -1,6 +1,7 @@ const std = @import("std"); const json = std.json; const Allocator = std.mem.Allocator; +const Thread = std.Thread; const ssz = @import("ssz"); const types = @import("@zeam/types"); @@ -99,7 +100,8 @@ pub const ProtoArray = struct { } } - pub fn applyDeltas(self: *Self, deltas: []isize, cutoff_weight: u64) !void { + // Internal unlocked version - assumes caller holds lock + fn applyDeltasUnlocked(self: *Self, deltas: []isize, cutoff_weight: u64) !void { if (deltas.len != self.nodes.items.len) { return ForkChoiceError.InvalidDeltas; } @@ -233,8 +235,22 @@ pub const ForkChoice = struct { // get added deltas: std.ArrayList(isize), logger: zeam_utils.ModuleLogger, + // Thread-safe access protection + mutex: Thread.RwLock, const Self = @This(); + + /// Thread-safe snapshot for observability + pub const Snapshot = struct { + head: ProtoNode, + latest_justified_root: [32]u8, + latest_finalized_root: [32]u8, + nodes: []ProtoNode, + + pub fn deinit(self: Snapshot, allocator: Allocator) void { + allocator.free(self.nodes); + } + }; pub fn init(allocator: Allocator, opts: ForkChoiceParams) !Self { const anchor_block_header = try opts.anchorState.genStateBlockHeader(allocator); var anchor_block_root: [32]u8 = undefined; @@ -274,11 +290,53 @@ pub const ForkChoice = struct { .safeTarget = anchor_block, .deltas = deltas, .logger = opts.logger, + .mutex = Thread.RwLock{}, }; - _ = try fc.updateHead(); + // No lock needed during init - struct not yet accessible to other threads + _ = try fc.updateHeadUnlocked(); return fc; } + /// Thread-safe snapshot for observability + /// Holds shared lock only during copy, caller formats JSON lock-free + pub fn snapshot(self: *Self, allocator: Allocator) !Snapshot { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + + // Quick copy - ProtoNode has no pointer members, shallow copy is safe + const nodes_copy = try allocator.alloc(ProtoNode, self.protoArray.nodes.items.len); + @memcpy(nodes_copy, self.protoArray.nodes.items); + + // Get the full ProtoNode for head from protoArray + const head_idx = self.protoArray.indices.get(self.head.blockRoot) orelse { + // Fallback: create a ProtoNode from ProtoBlock if not found + const head_node = ProtoNode{ + .slot = self.head.slot, + .blockRoot = self.head.blockRoot, + .parentRoot = self.head.parentRoot, + .stateRoot = self.head.stateRoot, + .timeliness = self.head.timeliness, + .parent = null, + .weight = 0, + .bestChild = null, + .bestDescendant = null, + }; + return Snapshot{ + .head = head_node, + .latest_justified_root = self.fcStore.latest_justified.root, + .latest_finalized_root = self.fcStore.latest_finalized.root, + .nodes = nodes_copy, + }; + }; + + return Snapshot{ + .head = self.protoArray.nodes.items[head_idx], + .latest_justified_root = self.fcStore.latest_justified.root, + .latest_finalized_root = self.fcStore.latest_finalized.root, + .nodes = nodes_copy, + }; + } + fn isBlockTimely(self: *Self, blockDelayMs: usize) bool { _ = self; _ = blockDelayMs; @@ -314,7 +372,8 @@ pub const ForkChoice = struct { /// Get all ancestor block roots from the current finalized block, /// traversing backwards, and collecting all blocks with slot > previousFinalizedSlot. /// Stops traversal when previousFinalizedSlot is reached or at genesis. - pub fn getAncestorsOfFinalized(self: *Self, allocator: Allocator, currentFinalized: types.Root, previousFinalizedSlot: types.Slot) ![]types.Root { + // Internal unlocked version - assumes caller holds lock + fn getAncestorsOfFinalizedUnlocked(self: *Self, allocator: Allocator, currentFinalized: types.Root, previousFinalizedSlot: types.Slot) ![]types.Root { var ancestors = std.ArrayList(types.Root).init(allocator); var current_idx_or_null = self.protoArray.indices.get(currentFinalized); @@ -333,7 +392,8 @@ pub const ForkChoice = struct { return ancestors.toOwnedSlice(); } - pub fn tickInterval(self: *Self, hasProposal: bool) !void { + // Internal unlocked version - assumes caller holds lock + fn tickIntervalUnlocked(self: *Self, hasProposal: bool) !void { self.fcStore.time += 1; const currentInterval = self.fcStore.time % constants.INTERVALS_PER_SLOT; @@ -341,28 +401,30 @@ pub const ForkChoice = struct { 0 => { self.fcStore.timeSlots += 1; if (hasProposal) { - _ = try self.acceptNewAttestations(); + _ = try self.acceptNewAttestationsUnlocked(); } }, 1 => {}, 2 => { - _ = try self.updateSafeTarget(); + _ = try self.updateSafeTargetUnlocked(); }, 3 => { - _ = try self.acceptNewAttestations(); + _ = try self.acceptNewAttestationsUnlocked(); }, else => @panic("invalid interval"), } self.logger.debug("forkchoice ticked to time(intervals)={d} slot={d}", .{ self.fcStore.time, self.fcStore.timeSlots }); } - pub fn onInterval(self: *Self, time_intervals: usize, has_proposal: bool) !void { + // Internal unlocked version - assumes caller holds lock + fn onIntervalUnlocked(self: *Self, time_intervals: usize, has_proposal: bool) !void { while (self.fcStore.time < time_intervals) { - try self.tickInterval(has_proposal and (self.fcStore.time + 1) == time_intervals); + try self.tickIntervalUnlocked(has_proposal and (self.fcStore.time + 1) == time_intervals); } } - pub fn acceptNewAttestations(self: *Self) !ProtoBlock { + // Internal unlocked version - assumes caller holds lock + fn acceptNewAttestationsUnlocked(self: *Self) !ProtoBlock { for (0..self.config.genesis.numValidators()) |validator_id| { var attestation_tracker = self.attestations.get(validator_id) orelse AttestationTracker{}; if (attestation_tracker.latestNew) |new_attestation| { @@ -374,19 +436,20 @@ pub const ForkChoice = struct { try self.attestations.put(validator_id, attestation_tracker); } - return self.updateHead(); + return self.updateHeadUnlocked(); } - pub fn getProposalHead(self: *Self, slot: types.Slot) !types.Checkpoint { + // Internal unlocked version - assumes caller holds lock + fn getProposalHeadUnlocked(self: *Self, slot: types.Slot) !types.Checkpoint { const time_intervals = slot * constants.INTERVALS_PER_SLOT; // this could be called independently by the validator when its a separate process // and FC would need to be protected by mutex to make it thread safe but for now // this is deterministally called after the fc has been ticked ahead // so the following call should be a no-op - try self.onInterval(time_intervals, true); + try self.onIntervalUnlocked(time_intervals, true); // accept any new attestations in case previous ontick was a no-op and either the validator // wasn't registered or there have been new attestations - const head = try self.acceptNewAttestations(); + const head = try self.acceptNewAttestationsUnlocked(); return types.Checkpoint{ .root = head.blockRoot, @@ -394,7 +457,8 @@ pub const ForkChoice = struct { }; } - pub fn getProposalAttestations(self: *Self) ![]types.SignedAttestation { + // Internal unlocked version - assumes caller holds lock + fn getProposalAttestationsUnlocked(self: *Self) ![]types.SignedAttestation { var included_attestations = std.ArrayList(types.SignedAttestation).init(self.allocator); const latest_justified = self.fcStore.latest_justified; @@ -415,7 +479,8 @@ pub const ForkChoice = struct { return included_attestations.toOwnedSlice(); } - pub fn getAttestationTarget(self: *Self) !types.Checkpoint { + // Internal unlocked version - assumes caller holds lock + fn getAttestationTargetUnlocked(self: *Self) !types.Checkpoint { var target_idx = self.protoArray.indices.get(self.head.blockRoot) orelse return ForkChoiceError.InvalidHeadIndex; const nodes = self.protoArray.nodes.items; @@ -436,7 +501,8 @@ pub const ForkChoice = struct { }; } - pub fn computeDeltas(self: *Self, from_known: bool) ![]isize { + // Internal unlocked version - assumes caller holds lock + fn computeDeltasUnlocked(self: *Self, from_known: bool) ![]isize { // prep the deltas data structure while (self.deltas.items.len < self.protoArray.nodes.items.len) { try self.deltas.append(0); @@ -469,9 +535,10 @@ pub const ForkChoice = struct { return self.deltas.items; } - pub fn computeFCHead(self: *Self, from_known: bool, cutoff_weight: u64) !ProtoBlock { - const deltas = try self.computeDeltas(from_known); - try self.protoArray.applyDeltas(deltas, cutoff_weight); + // Internal unlocked version - assumes caller holds lock + fn computeFCHeadUnlocked(self: *Self, from_known: bool, cutoff_weight: u64) !ProtoBlock { + const deltas = try self.computeDeltasUnlocked(from_known); + try self.protoArray.applyDeltasUnlocked(deltas, cutoff_weight); // head is the best descendant of latest justified const justified_idx = self.protoArray.indices.get(self.fcStore.latest_justified.root) orelse return ForkChoiceError.InvalidJustifiedRoot; @@ -494,20 +561,23 @@ pub const ForkChoice = struct { return fcHead; } - pub fn updateHead(self: *Self) !ProtoBlock { - self.head = try self.computeFCHead(true, 0); + // Internal unlocked version - assumes caller holds lock + fn updateHeadUnlocked(self: *Self) !ProtoBlock { + self.head = try self.computeFCHeadUnlocked(true, 0); // Update the lean_head_slot metric zeam_metrics.metrics.lean_head_slot.set(self.head.slot); return self.head; } - pub fn updateSafeTarget(self: *Self) !ProtoBlock { + // Internal unlocked version - assumes caller holds lock + fn updateSafeTargetUnlocked(self: *Self) !ProtoBlock { const cutoff_weight = try std.math.divCeil(u64, 2 * self.config.genesis.numValidators(), 3); - self.safeTarget = try self.computeFCHead(false, cutoff_weight); + self.safeTarget = try self.computeFCHeadUnlocked(false, cutoff_weight); return self.safeTarget; } - pub fn onAttestation(self: *Self, signed_attestation: types.SignedAttestation, is_from_block: bool) !void { + // Internal unlocked version - assumes caller holds lock + fn onAttestationUnlocked(self: *Self, signed_attestation: types.SignedAttestation, is_from_block: bool) !void { // Attestation validation is done by the caller (chain layer) // This function assumes the attestation has already been validated @@ -553,7 +623,8 @@ pub const ForkChoice = struct { } // we process state outside forkchoice onblock to parallize verifications and just use the post state here - pub fn onBlock(self: *Self, block: types.BeamBlock, state: *const types.BeamState, opts: OnBlockOpts) !ProtoBlock { + // Internal unlocked version - assumes caller holds lock + fn onBlockUnlocked(self: *Self, block: types.BeamBlock, state: *const types.BeamState, opts: OnBlockOpts) !ProtoBlock { const parent_root = block.parent_root; const slot = block.slot; @@ -600,7 +671,8 @@ pub const ForkChoice = struct { } } - pub fn hasBlock(self: *Self, blockRoot: types.Root) bool { + // Internal unlocked version - assumes caller holds lock + fn hasBlockUnlocked(self: *Self, blockRoot: types.Root) bool { const block_or_null = self.protoArray.getBlock(blockRoot); if (block_or_null) |_| { return true; @@ -608,6 +680,118 @@ pub const ForkChoice = struct { return false; } + + // PUBLIC API - LOCK AT BOUNDARY + // These methods acquire locks and delegate to unlocked helpers + + pub fn updateHead(self: *Self) !ProtoBlock { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.updateHeadUnlocked(); + } + + pub fn onBlock(self: *Self, block: types.BeamBlock, state: *const types.BeamState, opts: OnBlockOpts) !ProtoBlock { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.onBlockUnlocked(block, state, opts); + } + + pub fn onInterval(self: *Self, time_intervals: usize, has_proposal: bool) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.onIntervalUnlocked(time_intervals, has_proposal); + } + + pub fn onAttestation(self: *Self, signed_attestation: types.SignedAttestation, is_from_block: bool) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.onAttestationUnlocked(signed_attestation, is_from_block); + } + + pub fn updateSafeTarget(self: *Self) !ProtoBlock { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.updateSafeTargetUnlocked(); + } + + pub fn getProposalHead(self: *Self, slot: types.Slot) !types.Checkpoint { + self.mutex.lock(); // Write lock - mutates via onInterval + defer self.mutex.unlock(); + return self.getProposalHeadUnlocked(slot); + } + + // READ-ONLY API - SHARED LOCK + + pub fn getProposalAttestations(self: *Self) ![]types.SignedAttestation { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.getProposalAttestationsUnlocked(); + } + + pub fn getAttestationTarget(self: *Self) !types.Checkpoint { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.getAttestationTargetUnlocked(); + } + + pub fn hasBlock(self: *Self, blockRoot: types.Root) bool { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.hasBlockUnlocked(blockRoot); + } + + pub fn getAncestorsOfFinalized(self: *Self, allocator: Allocator, currentFinalized: types.Root, previousFinalizedSlot: types.Slot) ![]types.Root { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.getAncestorsOfFinalizedUnlocked(allocator, currentFinalized, previousFinalizedSlot); + } + + // SAFE GETTERS FOR SHARED STATE + // These provide thread-safe access to internal state + + /// Get a copy of the current head block + pub fn getHead(self: *Self) ProtoBlock { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.head; + } + + /// Get the latest justified checkpoint + pub fn getLatestJustified(self: *Self) types.Checkpoint { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.fcStore.latest_justified; + } + + /// Get the latest finalized checkpoint + pub fn getLatestFinalized(self: *Self) types.Checkpoint { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.fcStore.latest_finalized; + } + + /// Get the current time in slots + pub fn getCurrentSlot(self: *Self) types.Slot { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.fcStore.timeSlots; + } + + /// Check if a block exists and get its slot (thread-safe) + pub fn getBlockSlot(self: *Self, blockRoot: types.Root) ?types.Slot { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + const idx = self.protoArray.indices.get(blockRoot) orelse return null; + return self.protoArray.nodes.items[idx].slot; + } + + /// Get a ProtoNode by root (returns a copy) + pub fn getProtoNode(self: *Self, blockRoot: types.Root) ?ProtoNode { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + const idx = self.protoArray.indices.get(blockRoot) orelse return null; + return self.protoArray.nodes.items[idx]; + } }; const ForkChoiceError = error{ From f7c1b9dc0ad172fe0c99512d75cb59ff9268ff82 Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Mon, 15 Dec 2025 01:08:43 +0530 Subject: [PATCH 20/29] feat: add orbhan block check and marking in forkchoice visualization --- pkgs/cli/src/api_server.zig | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/pkgs/cli/src/api_server.zig b/pkgs/cli/src/api_server.zig index e8a5a384..66e81df8 100644 --- a/pkgs/cli/src/api_server.zig +++ b/pkgs/cli/src/api_server.zig @@ -117,6 +117,9 @@ fn handleForkChoiceGraph(request: *std.http.Server.Request, allocator: std.mem.A } } + // Cap max_slots to prevent resource exhaustion + if (max_slots > 200) max_slots = 200; + // Build the graph data var graph_json = std.ArrayList(u8).init(allocator); defer graph_json.deinit(); @@ -203,12 +206,30 @@ fn buildGraphJSON( break :blk false; }; + // Get finalized slot for orphaned block detection + const finalized_slot = if (finalized_idx) |fin_idx| proto_nodes[fin_idx].slot else 0; + + // A block is orphaned if: + // 1. It's at or before finalized slot, AND + // 2. It's NOT on the canonical chain (not finalized) + const is_orphaned = blk: { + // Only blocks at or before finalized slot can be orphaned + if (pnode.slot > finalized_slot) break :blk false; + // If already finalized (canonical), not orphaned + if (is_finalized) break :blk false; + + // If it's old enough to be finalized but isn't, it's orphaned + break :blk true; + }; + const role = if (is_finalized) "finalized" else if (is_justified) "justified" else if (is_head) "head" + else if (is_orphaned) + "orphaned" else "normal"; @@ -222,11 +243,12 @@ fn buildGraphJSON( // Use separate arc fields for each color (only one is set per node, others are 0) // This allows manual arc section configuration with explicit colors // TODO: Use chain.forkChoice.isBlockTimely(blockDelayMs) once implemented - // For now, treat all non-finalized/non-justified/non-head blocks as timely - const arc_timely: f64 = if (!is_finalized and !is_justified and !is_head) arc_weight else 0.0; + // For now, treat all non-finalized/non-justified/non-head/non-orphaned blocks as timely + const arc_timely: f64 = if (!is_finalized and !is_justified and !is_head and !is_orphaned) arc_weight else 0.0; const arc_head: f64 = if (is_head) arc_weight else 0.0; const arc_justified: f64 = if (is_justified) arc_weight else 0.0; const arc_finalized: f64 = if (is_finalized) arc_weight else 0.0; + const arc_orphaned: f64 = if (is_orphaned) arc_weight else 0.0; // Block root as hex const hex_prefix = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(pnode.blockRoot[0..4])}); @@ -239,7 +261,7 @@ fn buildGraphJSON( } try std.fmt.format(nodes_list.writer(), - \\{{"id":"{s}","title":"Slot {d}","mainStat":"{d}","secondaryStat":"{d}","arc__timely":{d:.4},"arc__head":{d:.4},"arc__justified":{d:.4},"arc__finalized":{d:.4},"detail__role":"{s}","detail__hex_prefix":"{s}"}} + \\{{"id":"{s}","title":"Slot {d}","mainStat":"{d}","secondaryStat":"{d}","arc__timely":{d:.4},"arc__head":{d:.4},"arc__justified":{d:.4},"arc__finalized":{d:.4},"arc__orphaned":{d:.4},"detail__role":"{s}","detail__hex_prefix":"{s}"}} , .{ full_root, pnode.slot, @@ -249,6 +271,7 @@ fn buildGraphJSON( arc_head, arc_justified, arc_finalized, + arc_orphaned, role, hex_prefix, }); From 057bc5a347526d64c91368785eff94d7fd7d85fc Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Mon, 15 Dec 2025 01:10:18 +0530 Subject: [PATCH 21/29] chore: lint fix --- pkgs/node/src/forkchoice.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/node/src/forkchoice.zig b/pkgs/node/src/forkchoice.zig index 07acfb82..3d7c7c37 100644 --- a/pkgs/node/src/forkchoice.zig +++ b/pkgs/node/src/forkchoice.zig @@ -715,7 +715,7 @@ pub const ForkChoice = struct { } pub fn getProposalHead(self: *Self, slot: types.Slot) !types.Checkpoint { - self.mutex.lock(); // Write lock - mutates via onInterval + self.mutex.lock(); // Write lock - mutates via onInterval defer self.mutex.unlock(); return self.getProposalHeadUnlocked(slot); } From ab66e8ef6f3863bc7e77dacf5330f2e8b0108e71 Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Sat, 20 Dec 2025 02:55:18 +0530 Subject: [PATCH 22/29] remove mutex and global chain --- pkgs/cli/src/api_server.zig | 72 ++++++++++--------------------------- pkgs/cli/src/main.zig | 12 +++---- pkgs/cli/src/node.zig | 7 +--- 3 files changed, 24 insertions(+), 67 deletions(-) diff --git a/pkgs/cli/src/api_server.zig b/pkgs/cli/src/api_server.zig index 66e81df8..890337a2 100644 --- a/pkgs/cli/src/api_server.zig +++ b/pkgs/cli/src/api_server.zig @@ -4,38 +4,21 @@ const constants = @import("constants.zig"); const event_broadcaster = api.event_broadcaster; const node = @import("@zeam/node"); -// Global chain reference for API access -var global_chain: ?*node.BeamChain = null; -var chain_mutex = std.Thread.Mutex{}; - -/// Register the chain for API access -pub fn registerChain(chain: *node.BeamChain) void { - chain_mutex.lock(); - defer chain_mutex.unlock(); - global_chain = chain; -} - -/// Get the global chain reference -fn getChain() ?*node.BeamChain { - chain_mutex.lock(); - defer chain_mutex.unlock(); - return global_chain; -} +const QUERY_SLOTS_PREFIX = "?slots="; +const DEFAULT_MAX_SLOTS: usize = 50; +const MAX_ALLOWED_SLOTS: usize = 200; -/// Simple metrics server that runs in a background thread -pub fn startAPIServer(allocator: std.mem.Allocator, port: u16) !void { - // Initialize the global event broadcaster +pub fn startAPIServer(allocator: std.mem.Allocator, port: u16, forkchoice: *node.fcFactory.ForkChoice) !void { try event_broadcaster.initGlobalBroadcaster(allocator); - // Create a simple HTTP server context const ctx = try allocator.create(SimpleMetricsServer); errdefer allocator.destroy(ctx); ctx.* = .{ .allocator = allocator, .port = port, + .forkchoice = forkchoice, }; - // Start server in background thread const thread = try std.Thread.spawn(.{}, SimpleMetricsServer.run, .{ctx}); thread.detach(); @@ -43,7 +26,7 @@ pub fn startAPIServer(allocator: std.mem.Allocator, port: u16) !void { } /// Handle individual HTTP connections in a separate thread -fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Allocator) void { +fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Allocator, forkchoice: *node.fcFactory.ForkChoice) void { defer connection.stream.close(); var buffer: [4096]u8 = undefined; @@ -84,7 +67,7 @@ fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Al }) catch {}; } else if (std.mem.startsWith(u8, request.head.target, "/api/forkchoice/graph")) { // Handle fork choice graph request - handleForkChoiceGraph(&request, allocator) catch |err| { + handleForkChoiceGraph(&request, allocator, forkchoice) catch |err| { std.log.warn("Fork choice graph request failed: {}", .{err}); _ = request.respond("Internal Server Error\n", .{}) catch {}; }; @@ -93,40 +76,24 @@ fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Al } } -/// Handle fork choice graph API request -fn handleForkChoiceGraph(request: *std.http.Server.Request, allocator: std.mem.Allocator) !void { - // Get chain reference - const chain = getChain() orelse { - const error_response = "{\"error\":\"Chain not initialized\"}"; - _ = request.respond(error_response, .{ - .extra_headers = &.{ - .{ .name = "content-type", .value = "application/json; charset=utf-8" }, - }, - }) catch {}; - return; - }; - - // Parse query parameters for max_slots (default: 50) - var max_slots: usize = 50; - if (std.mem.indexOf(u8, request.head.target, "?slots=")) |query_start| { - const slots_param = request.head.target[query_start + 7 ..]; +fn handleForkChoiceGraph(request: *std.http.Server.Request, allocator: std.mem.Allocator, forkchoice: *node.fcFactory.ForkChoice) !void { + var max_slots: usize = DEFAULT_MAX_SLOTS; + if (std.mem.indexOf(u8, request.head.target, QUERY_SLOTS_PREFIX)) |query_start| { + const slots_param = request.head.target[query_start + QUERY_SLOTS_PREFIX.len ..]; if (std.mem.indexOf(u8, slots_param, "&")) |end| { - max_slots = std.fmt.parseInt(usize, slots_param[0..end], 10) catch 50; + max_slots = std.fmt.parseInt(usize, slots_param[0..end], 10) catch DEFAULT_MAX_SLOTS; } else { - max_slots = std.fmt.parseInt(usize, slots_param, 10) catch 50; + max_slots = std.fmt.parseInt(usize, slots_param, 10) catch DEFAULT_MAX_SLOTS; } } - // Cap max_slots to prevent resource exhaustion - if (max_slots > 200) max_slots = 200; + if (max_slots > MAX_ALLOWED_SLOTS) max_slots = MAX_ALLOWED_SLOTS; - // Build the graph data var graph_json = std.ArrayList(u8).init(allocator); defer graph_json.deinit(); - try buildGraphJSON(chain, graph_json.writer(), max_slots, allocator); + try buildGraphJSON(forkchoice, graph_json.writer(), max_slots, allocator); - // Send response _ = request.respond(graph_json.items, .{ .extra_headers = &.{ .{ .name = "content-type", .value = "application/json; charset=utf-8" }, @@ -137,13 +104,12 @@ fn handleForkChoiceGraph(request: *std.http.Server.Request, allocator: std.mem.A /// Build fork choice graph in Grafana node-graph JSON format fn buildGraphJSON( - chain: *node.BeamChain, + forkchoice: *node.fcFactory.ForkChoice, writer: anytype, max_slots: usize, allocator: std.mem.Allocator, ) !void { - // Thread-safe snapshot - lock held only during copy - const snapshot = try chain.forkChoice.snapshot(allocator); + const snapshot = try forkchoice.snapshot(allocator); defer snapshot.deinit(allocator); const proto_nodes = snapshot.nodes; @@ -315,9 +281,9 @@ fn buildGraphJSON( const SimpleMetricsServer = struct { allocator: std.mem.Allocator, port: u16, + forkchoice: *node.fcFactory.ForkChoice, fn run(self: *SimpleMetricsServer) !void { - // `startMetricsServer` creates this, so we need to free it here defer self.allocator.destroy(self); const address = try std.net.Address.parseIp4("0.0.0.0", self.port); var server = try address.listen(.{ .reuse_address = true }); @@ -330,7 +296,7 @@ const SimpleMetricsServer = struct { // For SSE connections, we need to handle them differently // We'll spawn a new thread for each connection to handle persistence - _ = std.Thread.spawn(.{}, handleConnection, .{ connection, self.allocator }) catch |err| { + _ = std.Thread.spawn(.{}, handleConnection, .{ connection, self.allocator, self.forkchoice }) catch |err| { std.log.warn("Failed to spawn connection handler: {}", .{err}); connection.stream.close(); continue; diff --git a/pkgs/cli/src/main.zig b/pkgs/cli/src/main.zig index fc600b89..6b3ae796 100644 --- a/pkgs/cli/src/main.zig +++ b/pkgs/cli/src/main.zig @@ -301,12 +301,6 @@ fn mainInner() !void { return err; }; - // Start metrics HTTP server - api_server.startAPIServer(allocator, beamcmd.metricsPort) catch |err| { - ErrorHandler.logErrorWithDetails(err, "start API server", .{ .port = beamcmd.metricsPort }); - return err; - }; - std.debug.print("beam opts ={any}\n", .{beamcmd}); const mock_network = beamcmd.mockNetwork; @@ -474,8 +468,10 @@ fn mainInner() !void { .node_registry = registry_1, }); - // Register beam_node_1's chain for fork choice graph API - api_server.registerChain(beam_node_1.chain); + api_server.startAPIServer(allocator, beamcmd.metricsPort, &beam_node_1.chain.forkChoice) catch |err| { + ErrorHandler.logErrorWithDetails(err, "start API server", .{ .port = beamcmd.metricsPort }); + return err; + }; var beam_node_2: BeamNode = undefined; try beam_node_2.init(allocator, .{ diff --git a/pkgs/cli/src/node.zig b/pkgs/cli/src/node.zig index 45113000..cc969cb4 100644 --- a/pkgs/cli/src/node.zig +++ b/pkgs/cli/src/node.zig @@ -107,12 +107,8 @@ pub const Node = struct { self.allocator = allocator; self.options = options; - // Initialize event broadcaster - try event_broadcaster.initGlobalBroadcaster(allocator); - if (options.metrics_enable) { try api.init(allocator); - try api_server.startAPIServer(allocator, options.metrics_port); } // some base mainnet spec would be loaded to build this up @@ -176,9 +172,8 @@ pub const Node = struct { .node_registry = options.node_registry, }); - // Register the chain with the API server for fork choice graph endpoint if (options.metrics_enable) { - api_server.registerChain(self.beam_node.chain); + try api_server.startAPIServer(allocator, options.metrics_port, &self.beam_node.chain.forkChoice); } self.logger = options.logger_config.logger(.node); From 5da0cd8c6124422650ea715bd84eea12d4592dd8 Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Wed, 31 Dec 2025 20:47:01 +0530 Subject: [PATCH 23/29] fix: silence broadcaster-not-initialized warnings --- pkgs/node/src/chain.zig | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkgs/node/src/chain.zig b/pkgs/node/src/chain.zig index 6464b948..fbfdb5d2 100644 --- a/pkgs/node/src/chain.zig +++ b/pkgs/node/src/chain.zig @@ -504,7 +504,9 @@ pub const BeamChain = struct { if (api.events.NewHeadEvent.fromProtoBlock(self.allocator, new_head)) |head_event| { var chain_event = api.events.ChainEvent{ .new_head = head_event }; event_broadcaster.broadcastGlobalEvent(&chain_event) catch |err| { - self.module_logger.warn("Failed to broadcast head event: {any}", .{err}); + if (err != error.BroadcasterNotInitialized) { + self.module_logger.warn("Failed to broadcast head event: {any}", .{err}); + } chain_event.deinit(self.allocator); }; } else |err| { @@ -528,7 +530,9 @@ pub const BeamChain = struct { if (api.events.NewJustificationEvent.fromCheckpoint(self.allocator, latest_justified, new_head.slot)) |just_event| { var chain_event = api.events.ChainEvent{ .new_justification = just_event }; event_broadcaster.broadcastGlobalEvent(&chain_event) catch |err| { - self.module_logger.warn("Failed to broadcast justification event: {any}", .{err}); + if (err != error.BroadcasterNotInitialized) { + self.module_logger.warn("Failed to broadcast justification event: {any}", .{err}); + } chain_event.deinit(self.allocator); }; self.last_emitted_justified_slot = latest_justified.slot; @@ -542,7 +546,9 @@ pub const BeamChain = struct { if (api.events.NewFinalizationEvent.fromCheckpoint(self.allocator, latest_finalized, new_head.slot)) |final_event| { var chain_event = api.events.ChainEvent{ .new_finalization = final_event }; event_broadcaster.broadcastGlobalEvent(&chain_event) catch |err| { - self.module_logger.warn("Failed to broadcast finalization event: {any}", .{err}); + if (err != error.BroadcasterNotInitialized) { + self.module_logger.warn("Failed to broadcast finalization event: {any}", .{err}); + } chain_event.deinit(self.allocator); }; self.last_emitted_finalized_slot = latest_finalized.slot; From 26889b10fcca90d34255e1e77ac01e5f657fb088 Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Wed, 31 Dec 2025 21:05:12 +0530 Subject: [PATCH 24/29] feat: use per-connection arena for metrics server allocations --- pkgs/cli/src/api_server.zig | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkgs/cli/src/api_server.zig b/pkgs/cli/src/api_server.zig index 890337a2..2dbb1117 100644 --- a/pkgs/cli/src/api_server.zig +++ b/pkgs/cli/src/api_server.zig @@ -29,6 +29,10 @@ pub fn startAPIServer(allocator: std.mem.Allocator, port: u16, forkchoice: *node fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Allocator, forkchoice: *node.fcFactory.ForkChoice) void { defer connection.stream.close(); + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + const request_allocator = arena.allocator(); + var buffer: [4096]u8 = undefined; var http_server = std.http.Server.init(connection, &buffer); var request = http_server.receiveHead() catch |err| { @@ -39,12 +43,12 @@ fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Al // Route handling if (std.mem.eql(u8, request.head.target, "/events")) { // Handle SSE connection - this will keep the connection alive - SimpleMetricsServer.handleSSEEvents(connection.stream, allocator) catch |err| { + SimpleMetricsServer.handleSSEEvents(connection.stream, request_allocator) catch |err| { std.log.warn("SSE connection failed: {}", .{err}); }; } else if (std.mem.eql(u8, request.head.target, "/metrics")) { // Handle metrics request - var metrics_output = std.ArrayList(u8).init(allocator); + var metrics_output = std.ArrayList(u8).init(request_allocator); defer metrics_output.deinit(); api.writeMetrics(metrics_output.writer()) catch { @@ -67,7 +71,7 @@ fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Al }) catch {}; } else if (std.mem.startsWith(u8, request.head.target, "/api/forkchoice/graph")) { // Handle fork choice graph request - handleForkChoiceGraph(&request, allocator, forkchoice) catch |err| { + handleForkChoiceGraph(&request, request_allocator, forkchoice) catch |err| { std.log.warn("Fork choice graph request failed: {}", .{err}); _ = request.respond("Internal Server Error\n", .{}) catch {}; }; From 930ad701e758a3150d7d0b038f33a46d216735f6 Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Wed, 31 Dec 2025 21:50:16 +0530 Subject: [PATCH 25/29] fix: make metrics server stoppable and avoid forkchoice use after free --- pkgs/cli/src/api_server.zig | 91 ++++++++++++++++++++++++------------- pkgs/cli/src/main.zig | 5 +- pkgs/cli/src/node.zig | 7 ++- 3 files changed, 69 insertions(+), 34 deletions(-) diff --git a/pkgs/cli/src/api_server.zig b/pkgs/cli/src/api_server.zig index 2dbb1117..0a70adc4 100644 --- a/pkgs/cli/src/api_server.zig +++ b/pkgs/cli/src/api_server.zig @@ -7,8 +7,19 @@ const node = @import("@zeam/node"); const QUERY_SLOTS_PREFIX = "?slots="; const DEFAULT_MAX_SLOTS: usize = 50; const MAX_ALLOWED_SLOTS: usize = 200; +const ACCEPT_POLL_NS: u64 = 50 * std.time.ns_per_ms; -pub fn startAPIServer(allocator: std.mem.Allocator, port: u16, forkchoice: *node.fcFactory.ForkChoice) !void { +pub const APIServerHandle = struct { + thread: std.Thread, + ctx: *SimpleMetricsServer, + + pub fn stop(self: *APIServerHandle) void { + self.ctx.stop.store(true, .seq_cst); + self.thread.join(); + } +}; + +pub fn startAPIServer(allocator: std.mem.Allocator, port: u16, forkchoice: *node.fcFactory.ForkChoice) !APIServerHandle { try event_broadcaster.initGlobalBroadcaster(allocator); const ctx = try allocator.create(SimpleMetricsServer); @@ -17,37 +28,24 @@ pub fn startAPIServer(allocator: std.mem.Allocator, port: u16, forkchoice: *node .allocator = allocator, .port = port, .forkchoice = forkchoice, + .stop = std.atomic.Value(bool).init(false), }; const thread = try std.Thread.spawn(.{}, SimpleMetricsServer.run, .{ctx}); - thread.detach(); std.log.info("Metrics server started on port {d}", .{port}); + return .{ + .thread = thread, + .ctx = ctx, + }; } -/// Handle individual HTTP connections in a separate thread -fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Allocator, forkchoice: *node.fcFactory.ForkChoice) void { - defer connection.stream.close(); - +fn handleNonSSERequest(request: *std.http.Server.Request, allocator: std.mem.Allocator, forkchoice: *node.fcFactory.ForkChoice) void { var arena = std.heap.ArenaAllocator.init(allocator); defer arena.deinit(); const request_allocator = arena.allocator(); - var buffer: [4096]u8 = undefined; - var http_server = std.http.Server.init(connection, &buffer); - var request = http_server.receiveHead() catch |err| { - std.log.warn("Failed to receive HTTP head: {}", .{err}); - return; - }; - - // Route handling - if (std.mem.eql(u8, request.head.target, "/events")) { - // Handle SSE connection - this will keep the connection alive - SimpleMetricsServer.handleSSEEvents(connection.stream, request_allocator) catch |err| { - std.log.warn("SSE connection failed: {}", .{err}); - }; - } else if (std.mem.eql(u8, request.head.target, "/metrics")) { - // Handle metrics request + if (std.mem.eql(u8, request.head.target, "/metrics")) { var metrics_output = std.ArrayList(u8).init(request_allocator); defer metrics_output.deinit(); @@ -62,7 +60,6 @@ fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Al }, }) catch {}; } else if (std.mem.eql(u8, request.head.target, "/health")) { - // Handle health check const response = "{\"status\":\"healthy\",\"service\":\"zeam-metrics\"}"; _ = request.respond(response, .{ .extra_headers = &.{ @@ -70,8 +67,7 @@ fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Al }, }) catch {}; } else if (std.mem.startsWith(u8, request.head.target, "/api/forkchoice/graph")) { - // Handle fork choice graph request - handleForkChoiceGraph(&request, request_allocator, forkchoice) catch |err| { + handleForkChoiceGraph(request, request_allocator, forkchoice) catch |err| { std.log.warn("Fork choice graph request failed: {}", .{err}); _ = request.respond("Internal Server Error\n", .{}) catch {}; }; @@ -80,6 +76,27 @@ fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Al } } +fn routeConnection(connection: std.net.Server.Connection, allocator: std.mem.Allocator, forkchoice: *node.fcFactory.ForkChoice) void { + var buffer: [4096]u8 = undefined; + var http_server = std.http.Server.init(connection, &buffer); + var request = http_server.receiveHead() catch |err| { + std.log.warn("Failed to receive HTTP head: {}", .{err}); + connection.stream.close(); + return; + }; + + if (std.mem.eql(u8, request.head.target, "/events")) { + _ = std.Thread.spawn(.{}, SimpleMetricsServer.handleSSEConnection, .{ connection.stream, allocator }) catch |err| { + std.log.warn("Failed to spawn SSE handler: {}", .{err}); + connection.stream.close(); + }; + return; + } + + handleNonSSERequest(&request, allocator, forkchoice); + connection.stream.close(); +} + fn handleForkChoiceGraph(request: *std.http.Server.Request, allocator: std.mem.Allocator, forkchoice: *node.fcFactory.ForkChoice) !void { var max_slots: usize = DEFAULT_MAX_SLOTS; if (std.mem.indexOf(u8, request.head.target, QUERY_SLOTS_PREFIX)) |query_start| { @@ -286,28 +303,38 @@ const SimpleMetricsServer = struct { allocator: std.mem.Allocator, port: u16, forkchoice: *node.fcFactory.ForkChoice, + stop: std.atomic.Value(bool), fn run(self: *SimpleMetricsServer) !void { defer self.allocator.destroy(self); const address = try std.net.Address.parseIp4("0.0.0.0", self.port); - var server = try address.listen(.{ .reuse_address = true }); + var server = try address.listen(.{ .reuse_address = true, .force_nonblocking = true }); defer server.deinit(); std.log.info("HTTP server listening on http://0.0.0.0:{d}", .{self.port}); while (true) { - const connection = server.accept() catch continue; - - // For SSE connections, we need to handle them differently - // We'll spawn a new thread for each connection to handle persistence - _ = std.Thread.spawn(.{}, handleConnection, .{ connection, self.allocator, self.forkchoice }) catch |err| { - std.log.warn("Failed to spawn connection handler: {}", .{err}); - connection.stream.close(); + if (self.stop.load(.acquire)) break; + const connection = server.accept() catch |err| { + if (err == error.WouldBlock) { + std.time.sleep(ACCEPT_POLL_NS); + continue; + } + std.log.warn("Failed to accept connection: {}", .{err}); continue; }; + + routeConnection(connection, self.allocator, self.forkchoice); } } + fn handleSSEConnection(stream: std.net.Stream, allocator: std.mem.Allocator) void { + SimpleMetricsServer.handleSSEEvents(stream, allocator) catch |err| { + std.log.warn("SSE connection failed: {}", .{err}); + }; + stream.close(); + } + fn handleSSEEvents(stream: std.net.Stream, allocator: std.mem.Allocator) !void { _ = allocator; // Set SSE headers manually by writing HTTP response diff --git a/pkgs/cli/src/main.zig b/pkgs/cli/src/main.zig index 69076a4d..06806d84 100644 --- a/pkgs/cli/src/main.zig +++ b/pkgs/cli/src/main.zig @@ -459,6 +459,9 @@ fn mainInner() !void { const registry_1 = shared_registry; const registry_2 = shared_registry; + var api_server_handle: ?api_server.APIServerHandle = null; + defer if (api_server_handle) |*handle| handle.stop(); + var beam_node_1: BeamNode = undefined; try beam_node_1.init(allocator, .{ // options @@ -474,7 +477,7 @@ fn mainInner() !void { .node_registry = registry_1, }); - api_server.startAPIServer(allocator, beamcmd.metricsPort, &beam_node_1.chain.forkChoice) catch |err| { + api_server_handle = api_server.startAPIServer(allocator, beamcmd.metricsPort, &beam_node_1.chain.forkChoice) catch |err| { ErrorHandler.logErrorWithDetails(err, "start API server", .{ .port = beamcmd.metricsPort }); return err; }; diff --git a/pkgs/cli/src/node.zig b/pkgs/cli/src/node.zig index b05c6148..fa648035 100644 --- a/pkgs/cli/src/node.zig +++ b/pkgs/cli/src/node.zig @@ -119,6 +119,7 @@ pub const Node = struct { logger: zeam_utils.ModuleLogger, db: database.Db, key_manager: key_manager_lib.KeyManager, + api_server_handle: ?api_server.APIServerHandle, const Self = @This(); @@ -129,6 +130,7 @@ pub const Node = struct { ) !void { self.allocator = allocator; self.options = options; + self.api_server_handle = null; if (options.metrics_enable) { try api.init(allocator); @@ -199,13 +201,16 @@ pub const Node = struct { }); if (options.metrics_enable) { - try api_server.startAPIServer(allocator, options.metrics_port, &self.beam_node.chain.forkChoice); + self.api_server_handle = try api_server.startAPIServer(allocator, options.metrics_port, &self.beam_node.chain.forkChoice); } self.logger = options.logger_config.logger(.node); } pub fn deinit(self: *Self) void { + if (self.api_server_handle) |*handle| { + handle.stop(); + } self.clock.deinit(self.allocator); self.beam_node.deinit(); self.key_manager.deinit(); From 702e2f76cee78644db88151ba4bc9217f9441c36 Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Thu, 8 Jan 2026 15:47:23 +0530 Subject: [PATCH 26/29] chore: lint fix --- pkgs/node/src/forkchoice.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/node/src/forkchoice.zig b/pkgs/node/src/forkchoice.zig index abce11df..c205eef0 100644 --- a/pkgs/node/src/forkchoice.zig +++ b/pkgs/node/src/forkchoice.zig @@ -364,7 +364,7 @@ pub const ForkChoice = struct { // Internal unlocked version - assumes caller holds lock fn getAncestorsOfFinalizedUnlocked(self: *Self, allocator: Allocator, currentFinalized: types.Root, previousFinalizedSlot: types.Slot) ![]types.Root { var ancestors = std.ArrayList(types.Root).init(allocator); - + var current_idx_or_null = self.protoArray.indices.get(currentFinalized); while (current_idx_or_null) |current_idx| { From 5b36b05aa78837e7012945e69bf61d9fc6eca71d Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Thu, 8 Jan 2026 19:16:08 +0530 Subject: [PATCH 27/29] fix: errors and new test fixes --- pkgs/node/src/forkchoice.zig | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/pkgs/node/src/forkchoice.zig b/pkgs/node/src/forkchoice.zig index c205eef0..15004a31 100644 --- a/pkgs/node/src/forkchoice.zig +++ b/pkgs/node/src/forkchoice.zig @@ -1058,6 +1058,18 @@ pub const ForkChoice = struct { return self.confirmBlockUnlocked(blockRoot); } + pub fn computeDeltas(self: *Self, from_known: bool) ![]isize { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.computeDeltasUnlocked(from_known); + } + + pub fn applyDeltas(self: *Self, deltas: []isize, cutoff_weight: u64) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.protoArray.applyDeltasUnlocked(deltas, cutoff_weight); + } + // SAFE GETTERS FOR SHARED STATE // These provide thread-safe access to internal state @@ -1630,6 +1642,7 @@ fn buildTestTreeWithMockChain(allocator: Allocator, mock_chain: anytype) !struct .safeTarget = createTestProtoBlock(8, 0xFF, 0xEE), .deltas = std.ArrayList(isize).init(allocator), .logger = module_logger, + .mutex = Thread.RwLock{}, }; return .{ @@ -1823,7 +1836,7 @@ test "rebase: bestChild and bestDescendant remapping" { // Apply deltas to establish weights and bestChild/bestDescendant const deltas = try ctx.fork_choice.computeDeltas(true); - try ctx.fork_choice.protoArray.applyDeltas(deltas, 0); + try ctx.fork_choice.applyDeltas(deltas, 0); // Verify pre-rebase bestChild/bestDescendant // C(2) should have bestChild=3(D) since D branch has all 4 votes @@ -1923,7 +1936,7 @@ test "rebase: weight preservation after rebase" { // Apply deltas to establish weights const deltas = try ctx.fork_choice.computeDeltas(true); - try ctx.fork_choice.protoArray.applyDeltas(deltas, 0); + try ctx.fork_choice.applyDeltas(deltas, 0); // Record pre-rebase weights for nodes that will remain const pre_rebase_weight_C = ctx.fork_choice.protoArray.nodes.items[2].weight; // C @@ -2578,6 +2591,7 @@ test "rebase: heavy attestation load - all validators tracked correctly" { .safeTarget = createTestProtoBlock(3, 0xDD, 0xCC), .deltas = std.ArrayList(isize).init(allocator), .logger = module_logger, + .mutex = Thread.RwLock{}, }; // Note: We don't defer proto_array.nodes/indices.deinit() here because they're // moved into fork_choice and will be deinitialized separately From f7f00508afc3d412f5c4f4c806ac04eed6182a38 Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Thu, 8 Jan 2026 19:53:05 +0530 Subject: [PATCH 28/29] feat: add mutex based safeTarget fetching --- pkgs/node/src/chain.zig | 2 +- pkgs/node/src/forkchoice.zig | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pkgs/node/src/chain.zig b/pkgs/node/src/chain.zig index 3bc3b841..f84433d8 100644 --- a/pkgs/node/src/chain.zig +++ b/pkgs/node/src/chain.zig @@ -334,7 +334,7 @@ pub const BeamChain = struct { const head_str = try head.toJsonString(self.allocator); defer self.allocator.free(head_str); - const safe_target_proto = self.forkChoice.safeTarget; + const safe_target_proto = self.forkChoice.getSafeTarget(); const safe_target: types.Checkpoint = .{ .root = safe_target_proto.blockRoot, .slot = safe_target_proto.slot, diff --git a/pkgs/node/src/forkchoice.zig b/pkgs/node/src/forkchoice.zig index 15004a31..cea87b16 100644 --- a/pkgs/node/src/forkchoice.zig +++ b/pkgs/node/src/forkchoice.zig @@ -1080,6 +1080,13 @@ pub const ForkChoice = struct { return self.head; } + /// Get the current safe target block (thread-safe) + pub fn getSafeTarget(self: *Self) ProtoBlock { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.safeTarget; + } + /// Get the latest justified checkpoint pub fn getLatestJustified(self: *Self) types.Checkpoint { self.mutex.lockShared(); From a09df93ddb92c01ee89a59edecbe00f68bf70e04 Mon Sep 17 00:00:00 2001 From: Chetany Bhardwaj Date: Thu, 8 Jan 2026 20:56:15 +0530 Subject: [PATCH 29/29] chore: cleanup unused functions --- pkgs/node/src/chain.zig | 1 - pkgs/node/src/forkchoice.zig | 53 ------------------------------------ 2 files changed, 54 deletions(-) diff --git a/pkgs/node/src/chain.zig b/pkgs/node/src/chain.zig index f84433d8..8f4ae8c1 100644 --- a/pkgs/node/src/chain.zig +++ b/pkgs/node/src/chain.zig @@ -325,7 +325,6 @@ pub const BeamChain = struct { pub fn constructAttestationData(self: *Self, opts: AttestationConstructionParams) !types.AttestationData { const slot = opts.slot; - // const head = try self.forkChoice.getProposalHead(slot); const head_proto = self.forkChoice.getHead(); const head: types.Checkpoint = .{ .root = head_proto.blockRoot, diff --git a/pkgs/node/src/forkchoice.zig b/pkgs/node/src/forkchoice.zig index cea87b16..fb9c2b07 100644 --- a/pkgs/node/src/forkchoice.zig +++ b/pkgs/node/src/forkchoice.zig @@ -358,29 +358,6 @@ pub const ForkChoice = struct { return false; } - /// Get all ancestor block roots from the current finalized block, - /// traversing backwards, and collecting all blocks with slot > previousFinalizedSlot. - /// Stops traversal when previousFinalizedSlot is reached or at genesis. - // Internal unlocked version - assumes caller holds lock - fn getAncestorsOfFinalizedUnlocked(self: *Self, allocator: Allocator, currentFinalized: types.Root, previousFinalizedSlot: types.Slot) ![]types.Root { - var ancestors = std.ArrayList(types.Root).init(allocator); - - var current_idx_or_null = self.protoArray.indices.get(currentFinalized); - - while (current_idx_or_null) |current_idx| { - const current_node = self.protoArray.nodes.items[current_idx]; - if (current_node.slot < previousFinalizedSlot) { - return error.InvalidFinalizationTraversal; - } else if (current_node.slot == previousFinalizedSlot) { - break; - } else { - try ancestors.append(current_node.blockRoot); - current_idx_or_null = current_node.parent; - } - } - return ancestors.toOwnedSlice(); - } - /// Builds a canonical view hashmap containing all blocks in the canonical chain /// from targetAnchor back to prevAnchor, plus all their unfinalized descendants. // Internal unlocked version - assumes caller holds lock @@ -687,24 +664,6 @@ pub const ForkChoice = struct { return self.updateHeadUnlocked(); } - // Internal unlocked version - assumes caller holds lock - fn getProposalHeadUnlocked(self: *Self, slot: types.Slot) !types.Checkpoint { - const time_intervals = slot * constants.INTERVALS_PER_SLOT; - // this could be called independently by the validator when its a separate process - // and FC would need to be protected by mutex to make it thread safe but for now - // this is deterministally called after the fc has been ticked ahead - // so the following call should be a no-op - try self.onIntervalUnlocked(time_intervals, true); - // accept any new attestations in case previous ontick was a no-op and either the validator - // wasn't registered or there have been new attestations - const head = try self.acceptNewAttestationsUnlocked(); - - return types.Checkpoint{ - .root = head.blockRoot, - .slot = head.slot, - }; - } - // Internal unlocked version - assumes caller holds lock fn getProposalAttestationsUnlocked(self: *Self) ![]types.SignedAttestation { var included_attestations = std.ArrayList(types.SignedAttestation).init(self.allocator); @@ -990,12 +949,6 @@ pub const ForkChoice = struct { return self.updateSafeTargetUnlocked(); } - pub fn getProposalHead(self: *Self, slot: types.Slot) !types.Checkpoint { - self.mutex.lock(); // Write lock - mutates via onInterval - defer self.mutex.unlock(); - return self.getProposalHeadUnlocked(slot); - } - // READ-ONLY API - SHARED LOCK pub fn getProposalAttestations(self: *Self) ![]types.SignedAttestation { @@ -1022,12 +975,6 @@ pub const ForkChoice = struct { return self.getBlockUnlocked(blockRoot); } - pub fn getAncestorsOfFinalized(self: *Self, allocator: Allocator, currentFinalized: types.Root, previousFinalizedSlot: types.Slot) ![]types.Root { - self.mutex.lockShared(); - defer self.mutex.unlockShared(); - return self.getAncestorsOfFinalizedUnlocked(allocator, currentFinalized, previousFinalizedSlot); - } - pub fn getCanonicalView(self: *Self, canonical_view: *std.AutoHashMap(types.Root, void), targetAnchorRoot: types.Root, prevAnchorRootOrNull: ?types.Root) !void { self.mutex.lockShared(); defer self.mutex.unlockShared();