diff --git a/pkgs/cli/src/api_server.zig b/pkgs/cli/src/api_server.zig index 792128a1..b7c1ea84 100644 --- a/pkgs/cli/src/api_server.zig +++ b/pkgs/cli/src/api_server.zig @@ -2,47 +2,99 @@ const std = @import("std"); const api = @import("@zeam/api"); const constants = @import("constants.zig"); const event_broadcaster = api.event_broadcaster; +const node = @import("@zeam/node"); -/// Simple metrics server that runs in a background thread -pub fn startAPIServer(allocator: std.mem.Allocator, port: u16) !void { - // Initialize the global event broadcaster +const QUERY_SLOTS_PREFIX = "?slots="; +const DEFAULT_MAX_SLOTS: usize = 50; +const MAX_ALLOWED_SLOTS: usize = 200; +const ACCEPT_POLL_NS: u64 = 50 * std.time.ns_per_ms; +// Conservative defaults for a local metrics server. +const MAX_SSE_CONNECTIONS: usize = 32; +const MAX_GRAPH_INFLIGHT: usize = 2; +const RATE_LIMIT_RPS: f64 = 2.0; +const RATE_LIMIT_BURST: f64 = 5.0; +const RATE_LIMIT_MAX_ENTRIES: usize = 256; // Max tracked IPs to bound memory. +const RATE_LIMIT_CLEANUP_THRESHOLD: usize = RATE_LIMIT_MAX_ENTRIES / 2; // Trigger lazy cleanup. +const RATE_LIMIT_STALE_NS: u64 = 10 * std.time.ns_per_min; // Evict entries idle past TTL. +const RATE_LIMIT_CLEANUP_COOLDOWN_NS: u64 = 60 * std.time.ns_per_s; + +pub const APIServerHandle = struct { + thread: std.Thread, + ctx: *SimpleMetricsServer, + + pub fn stop(self: *APIServerHandle) void { + self.ctx.stop.store(true, .seq_cst); + self.thread.join(); + } +}; + +pub fn startAPIServer(allocator: std.mem.Allocator, port: u16, forkchoice: *node.fcFactory.ForkChoice) !APIServerHandle { try event_broadcaster.initGlobalBroadcaster(allocator); - // Create a simple HTTP server context + var rate_limiter = try RateLimiter.init(allocator); + errdefer rate_limiter.deinit(); + const ctx = try allocator.create(SimpleMetricsServer); errdefer allocator.destroy(ctx); ctx.* = .{ .allocator = allocator, .port = port, + .forkchoice = forkchoice, + .stop = std.atomic.Value(bool).init(false), + .sse_active = 0, + .graph_inflight = 0, + .rate_limiter = rate_limiter, }; - // Start server in background thread const thread = try std.Thread.spawn(.{}, SimpleMetricsServer.run, .{ctx}); - thread.detach(); std.log.info("Metrics server started on port {d}", .{port}); + return .{ + .thread = thread, + .ctx = ctx, + }; } -/// Handle individual HTTP connections in a separate thread -fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Allocator) void { - defer connection.stream.close(); - +fn routeConnection(connection: std.net.Server.Connection, allocator: std.mem.Allocator, forkchoice: *node.fcFactory.ForkChoice, ctx: *SimpleMetricsServer) void { var buffer: [4096]u8 = undefined; var http_server = std.http.Server.init(connection, &buffer); var request = http_server.receiveHead() catch |err| { std.log.warn("Failed to receive HTTP head: {}", .{err}); + connection.stream.close(); return; }; - // Route handling if (std.mem.eql(u8, request.head.target, "/events")) { - // Handle SSE connection - this will keep the connection alive - SimpleMetricsServer.handleSSEEvents(connection.stream, allocator) catch |err| { - std.log.warn("SSE connection failed: {}", .{err}); + if (!ctx.tryAcquireSSE()) { + _ = request.respond("Service Unavailable\n", .{ .status = .service_unavailable }) catch {}; + connection.stream.close(); + return; + } + _ = std.Thread.spawn(.{}, SimpleMetricsServer.handleSSEConnection, .{ connection.stream, allocator, ctx }) catch |err| { + std.log.warn("Failed to spawn SSE handler: {}", .{err}); + ctx.releaseSSE(); + connection.stream.close(); }; - } else if (std.mem.eql(u8, request.head.target, "/metrics")) { - // Handle metrics request - var metrics_output = std.ArrayList(u8).init(allocator); + return; + } + + handleNonSSERequestWithAddr(&request, allocator, forkchoice, ctx, connection.address); + connection.stream.close(); +} + +fn handleNonSSERequestWithAddr( + request: *std.http.Server.Request, + allocator: std.mem.Allocator, + forkchoice: *node.fcFactory.ForkChoice, + ctx: *SimpleMetricsServer, + client_addr: std.net.Address, +) void { + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + const request_allocator = arena.allocator(); + + if (std.mem.eql(u8, request.head.target, "/metrics")) { + var metrics_output = std.ArrayList(u8).init(request_allocator); defer metrics_output.deinit(); api.writeMetrics(metrics_output.writer()) catch { @@ -56,45 +108,284 @@ fn handleConnection(connection: std.net.Server.Connection, allocator: std.mem.Al }, }) catch {}; } else if (std.mem.eql(u8, request.head.target, "/health")) { - // Handle health check const response = "{\"status\":\"healthy\",\"service\":\"zeam-metrics\"}"; _ = request.respond(response, .{ .extra_headers = &.{ .{ .name = "content-type", .value = "application/json; charset=utf-8" }, }, }) catch {}; + } else if (std.mem.startsWith(u8, request.head.target, "/api/forkchoice/graph")) { + handleForkChoiceGraph(request, request_allocator, forkchoice, ctx, client_addr) catch |err| { + std.log.warn("Fork choice graph request failed: {}", .{err}); + _ = request.respond("Internal Server Error\n", .{}) catch {}; + }; } else { _ = request.respond("Not Found\n", .{ .status = .not_found }) catch {}; } } +fn handleForkChoiceGraph( + request: *std.http.Server.Request, + allocator: std.mem.Allocator, + forkchoice: *node.fcFactory.ForkChoice, + ctx: *SimpleMetricsServer, + client_addr: std.net.Address, +) !void { + // Per-IP token bucket + global in-flight cap for the graph endpoint. + if (!ctx.rate_limiter.allow(client_addr)) { + _ = request.respond("Too Many Requests\n", .{ .status = .too_many_requests }) catch {}; + return; + } + if (!ctx.tryAcquireGraph()) { + _ = request.respond("Too Many Requests\n", .{ .status = .too_many_requests }) catch {}; + return; + } + defer ctx.releaseGraph(); + + var max_slots: usize = DEFAULT_MAX_SLOTS; + if (std.mem.indexOf(u8, request.head.target, QUERY_SLOTS_PREFIX)) |query_start| { + const slots_param = request.head.target[query_start + QUERY_SLOTS_PREFIX.len ..]; + if (std.mem.indexOf(u8, slots_param, "&")) |end| { + max_slots = std.fmt.parseInt(usize, slots_param[0..end], 10) catch DEFAULT_MAX_SLOTS; + } else { + max_slots = std.fmt.parseInt(usize, slots_param, 10) catch DEFAULT_MAX_SLOTS; + } + } + + if (max_slots > MAX_ALLOWED_SLOTS) max_slots = MAX_ALLOWED_SLOTS; + + var graph_json = std.ArrayList(u8).init(allocator); + defer graph_json.deinit(); + + try buildGraphJSON(forkchoice, graph_json.writer(), max_slots, allocator); + + _ = request.respond(graph_json.items, .{ + .extra_headers = &.{ + .{ .name = "content-type", .value = "application/json; charset=utf-8" }, + .{ .name = "access-control-allow-origin", .value = "*" }, + }, + }) catch {}; +} + +/// Build fork choice graph in Grafana node-graph JSON format +fn buildGraphJSON( + forkchoice: *node.fcFactory.ForkChoice, + writer: anytype, + max_slots: usize, + allocator: std.mem.Allocator, +) !void { + const snapshot = try forkchoice.snapshot(allocator); + defer snapshot.deinit(allocator); + + const proto_nodes = snapshot.nodes; + + // Determine the slot threshold (show only recent slots) + const current_slot = snapshot.head.slot; + const min_slot = if (current_slot > max_slots) current_slot - max_slots else 0; + + // Build nodes and edges + var nodes_list = std.ArrayList(u8).init(allocator); + defer nodes_list.deinit(); + var edges_list = std.ArrayList(u8).init(allocator); + defer edges_list.deinit(); + + var node_count: usize = 0; + var edge_count: usize = 0; + + // Find max weight for normalization + var max_weight: isize = 1; + for (proto_nodes) |pnode| { + if (pnode.slot >= min_slot and pnode.weight > max_weight) { + max_weight = pnode.weight; + } + } + + // Build nodes + // Find the finalized node index to check ancestry + const finalized_idx = blk: { + for (proto_nodes, 0..) |n, i| { + if (std.mem.eql(u8, &n.blockRoot, &snapshot.latest_finalized_root)) { + break :blk i; + } + } + break :blk null; + }; + + for (proto_nodes, 0..) |pnode, idx| { + if (pnode.slot < min_slot) continue; + + // Determine node role and color + const is_head = std.mem.eql(u8, &pnode.blockRoot, &snapshot.head.blockRoot); + const is_justified = std.mem.eql(u8, &pnode.blockRoot, &snapshot.latest_justified_root); + + // A block is finalized if: + // 1. It equals the finalized checkpoint, OR + // 2. The finalized block is a descendant of it (block is ancestor of finalized) + const is_finalized = blk: { + // Check if this block IS the finalized block + if (std.mem.eql(u8, &pnode.blockRoot, &snapshot.latest_finalized_root)) { + break :blk true; + } + // Check if this block is an ancestor of the finalized block + if (finalized_idx) |fin_idx| { + var current_idx: ?usize = fin_idx; + while (current_idx) |curr| { + if (curr == idx) break :blk true; + current_idx = proto_nodes[curr].parent; + } + } + break :blk false; + }; + + // Get finalized slot for orphaned block detection + const finalized_slot = if (finalized_idx) |fin_idx| proto_nodes[fin_idx].slot else 0; + + // A block is orphaned if: + // 1. It's at or before finalized slot, AND + // 2. It's NOT on the canonical chain (not finalized) + const is_orphaned = blk: { + // Only blocks at or before finalized slot can be orphaned + if (pnode.slot > finalized_slot) break :blk false; + // If already finalized (canonical), not orphaned + if (is_finalized) break :blk false; + + // If it's old enough to be finalized but isn't, it's orphaned + break :blk true; + }; + + const role = if (is_finalized) + "finalized" + else if (is_justified) + "justified" + else if (is_head) + "head" + else if (is_orphaned) + "orphaned" + else + "normal"; + + // Normalized weight for arc (0.0 to 1.0, draws partial circle border) + // Represents fraction of circle filled (0.5 = half circle, 1.0 = full circle) + const arc_weight: f64 = if (max_weight > 0) + @as(f64, @floatFromInt(pnode.weight)) / @as(f64, @floatFromInt(max_weight)) + else + 0.0; + + // Use separate arc fields for each color (only one is set per node, others are 0) + // This allows manual arc section configuration with explicit colors + // TODO: Use chain.forkChoice.isBlockTimely(blockDelayMs) once implemented + // For now, treat all non-finalized/non-justified/non-head/non-orphaned blocks as timely + const arc_timely: f64 = if (!is_finalized and !is_justified and !is_head and !is_orphaned) arc_weight else 0.0; + const arc_head: f64 = if (is_head) arc_weight else 0.0; + const arc_justified: f64 = if (is_justified) arc_weight else 0.0; + const arc_finalized: f64 = if (is_finalized) arc_weight else 0.0; + const arc_orphaned: f64 = if (is_orphaned) arc_weight else 0.0; + + // Block root as hex + const hex_prefix = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(pnode.blockRoot[0..4])}); + defer allocator.free(hex_prefix); + const full_root = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(&pnode.blockRoot)}); + defer allocator.free(full_root); + + if (node_count > 0) { + try nodes_list.appendSlice(","); + } + + try std.fmt.format(nodes_list.writer(), + \\{{"id":"{s}","title":"Slot {d}","mainStat":"{d}","secondaryStat":"{d}","arc__timely":{d:.4},"arc__head":{d:.4},"arc__justified":{d:.4},"arc__finalized":{d:.4},"arc__orphaned":{d:.4},"detail__role":"{s}","detail__hex_prefix":"{s}"}} + , .{ + full_root, + pnode.slot, + pnode.weight, + pnode.slot, + arc_timely, + arc_head, + arc_justified, + arc_finalized, + arc_orphaned, + role, + hex_prefix, + }); + + node_count += 1; + + // Build edges (parent -> child relationships) + if (pnode.parent) |parent_idx| { + const parent_node = proto_nodes[parent_idx]; + if (parent_node.slot >= min_slot) { + const parent_root = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(&parent_node.blockRoot)}); + defer allocator.free(parent_root); + + const is_best_child = if (parent_node.bestChild) |bc| bc == idx else false; + + if (edge_count > 0) { + try edges_list.appendSlice(","); + } + + try std.fmt.format(edges_list.writer(), + \\{{"id":"edge_{d}","source":"{s}","target":"{s}","mainStat":"","detail__is_best_child":{}}} + , .{ + edge_count, + parent_root, + full_root, + is_best_child, + }); + + edge_count += 1; + } + } + } + + // Write final JSON + try std.fmt.format(writer, + \\{{"nodes":[{s}],"edges":[{s}]}} + , .{ nodes_list.items, edges_list.items }); +} + /// Simple metrics server context const SimpleMetricsServer = struct { allocator: std.mem.Allocator, port: u16, + forkchoice: *node.fcFactory.ForkChoice, + stop: std.atomic.Value(bool), + sse_active: usize, + graph_inflight: usize, + rate_limiter: RateLimiter, + sse_mutex: std.Thread.Mutex = .{}, + graph_mutex: std.Thread.Mutex = .{}, fn run(self: *SimpleMetricsServer) !void { - // `startMetricsServer` creates this, so we need to free it here defer self.allocator.destroy(self); + defer self.rate_limiter.deinit(); const address = try std.net.Address.parseIp4("0.0.0.0", self.port); - var server = try address.listen(.{ .reuse_address = true }); + var server = try address.listen(.{ .reuse_address = true, .force_nonblocking = true }); defer server.deinit(); std.log.info("HTTP server listening on http://0.0.0.0:{d}", .{self.port}); while (true) { - const connection = server.accept() catch continue; - - // For SSE connections, we need to handle them differently - // We'll spawn a new thread for each connection to handle persistence - _ = std.Thread.spawn(.{}, handleConnection, .{ connection, self.allocator }) catch |err| { - std.log.warn("Failed to spawn connection handler: {}", .{err}); - connection.stream.close(); + if (self.stop.load(.acquire)) break; + const connection = server.accept() catch |err| { + if (err == error.WouldBlock) { + std.time.sleep(ACCEPT_POLL_NS); + continue; + } + std.log.warn("Failed to accept connection: {}", .{err}); continue; }; + + routeConnection(connection, self.allocator, self.forkchoice, self); } } + fn handleSSEConnection(stream: std.net.Stream, allocator: std.mem.Allocator, ctx: *SimpleMetricsServer) void { + SimpleMetricsServer.handleSSEEvents(stream, allocator) catch |err| { + std.log.warn("SSE connection failed: {}", .{err}); + }; + stream.close(); + ctx.releaseSSE(); + } + fn handleSSEEvents(stream: std.net.Stream, allocator: std.mem.Allocator) !void { _ = allocator; // Set SSE headers manually by writing HTTP response @@ -130,4 +421,140 @@ const SimpleMetricsServer = struct { std.time.sleep(constants.SSE_HEARTBEAT_SECONDS * std.time.ns_per_s); } } + + fn tryAcquireSSE(self: *SimpleMetricsServer) bool { + self.sse_mutex.lock(); + defer self.sse_mutex.unlock(); + // Limit long-lived SSE connections to avoid unbounded threads. + if (self.sse_active >= MAX_SSE_CONNECTIONS) return false; + self.sse_active += 1; + return true; + } + + fn releaseSSE(self: *SimpleMetricsServer) void { + self.sse_mutex.lock(); + defer self.sse_mutex.unlock(); + if (self.sse_active > 0) self.sse_active -= 1; + } + + fn tryAcquireGraph(self: *SimpleMetricsServer) bool { + self.graph_mutex.lock(); + defer self.graph_mutex.unlock(); + // Cap concurrent graph JSON generation. + if (self.graph_inflight >= MAX_GRAPH_INFLIGHT) return false; + self.graph_inflight += 1; + return true; + } + + fn releaseGraph(self: *SimpleMetricsServer) void { + self.graph_mutex.lock(); + defer self.graph_mutex.unlock(); + if (self.graph_inflight > 0) self.graph_inflight -= 1; + } +}; + +const RateLimitEntry = struct { + tokens: f64, + last_refill_ns: u64, +}; + +const RateLimiter = struct { + allocator: std.mem.Allocator, + entries: std.StringHashMap(RateLimitEntry), + mutex: std.Thread.Mutex = .{}, + last_cleanup_ns: u64 = 0, + + fn init(allocator: std.mem.Allocator) !RateLimiter { + return .{ + .allocator = allocator, + .entries = std.StringHashMap(RateLimitEntry).init(allocator), + }; + } + + fn deinit(self: *RateLimiter) void { + var it = self.entries.iterator(); + while (it.next()) |entry| { + self.allocator.free(entry.key_ptr.*); + } + self.entries.deinit(); + } + + fn allow(self: *RateLimiter, addr: std.net.Address) bool { + const now_signed = std.time.nanoTimestamp(); + const now: u64 = if (now_signed > 0) @intCast(now_signed) else 0; + var key_buf: [64]u8 = undefined; + const key = addrToKey(&key_buf, addr) orelse return true; + + self.mutex.lock(); + defer self.mutex.unlock(); + + if (self.entries.count() > RATE_LIMIT_CLEANUP_THRESHOLD and now - self.last_cleanup_ns > RATE_LIMIT_CLEANUP_COOLDOWN_NS) { + // Opportunistic TTL cleanup with cooldown to prevent repeated full scans on the hot path. + self.evictStale(now); + self.last_cleanup_ns = now; + } + + // Hard cap on tracked IPs to bound memory in long-lived processes. + if (self.entries.count() >= RATE_LIMIT_MAX_ENTRIES and !self.entries.contains(key)) { + return false; + } + + const entry = self.entries.getPtr(key) orelse blk: { + const owned_key = self.allocator.dupe(u8, key) catch return false; + _ = self.entries.put(owned_key, .{ .tokens = RATE_LIMIT_BURST, .last_refill_ns = now }) catch return false; + break :blk self.entries.getPtr(owned_key) orelse return false; + }; + + // Token bucket refill based on elapsed time. + const elapsed_ns = if (now > entry.last_refill_ns) now - entry.last_refill_ns else 0; + const refill = (@as(f64, @floatFromInt(elapsed_ns)) / @as(f64, @floatFromInt(std.time.ns_per_s))) * RATE_LIMIT_RPS; + entry.tokens = @min(RATE_LIMIT_BURST, entry.tokens + refill); + entry.last_refill_ns = now; + + if (entry.tokens < 1.0) return false; + entry.tokens -= 1.0; + return true; + } + + fn evictStale(self: *RateLimiter, now: u64) void { + // Collect keys first to avoid mutating the map while iterating. + var to_remove = std.ArrayList([]const u8).init(self.allocator); + defer to_remove.deinit(); + + var it = self.entries.iterator(); + while (it.next()) |entry| { + if (now - entry.value_ptr.last_refill_ns > RATE_LIMIT_STALE_NS) { + to_remove.append(entry.key_ptr.*) catch continue; + } + } + + for (to_remove.items) |key| { + if (self.entries.remove(key)) { + self.allocator.free(key); + } + } + } }; + +fn addrToKey(buf: []u8, addr: std.net.Address) ?[]const u8 { + return switch (addr.any.family) { + std.posix.AF.INET => blk: { + // IPv4 as dotted quad. + const bytes = @as(*const [4]u8, @ptrCast(&addr.in.sa.addr)); + const len = std.fmt.bufPrint(buf, "{d}.{d}.{d}.{d}", .{ + bytes[0], + bytes[1], + bytes[2], + bytes[3], + }) catch return null; + break :blk len; + }, + std.posix.AF.INET6 => blk: { + const ip6 = addr.in6.sa.addr; + // IPv6 as hex string. + const len = std.fmt.bufPrint(buf, "{s}", .{std.fmt.fmtSliceHexLower(&ip6)}) catch return null; + break :blk len; + }, + else => null, + }; +} diff --git a/pkgs/cli/src/main.zig b/pkgs/cli/src/main.zig index 1a666706..06806d84 100644 --- a/pkgs/cli/src/main.zig +++ b/pkgs/cli/src/main.zig @@ -301,12 +301,6 @@ fn mainInner() !void { return err; }; - // Start metrics HTTP server - api_server.startAPIServer(allocator, beamcmd.metricsPort) catch |err| { - ErrorHandler.logErrorWithDetails(err, "start API server", .{ .port = beamcmd.metricsPort }); - return err; - }; - std.debug.print("beam opts ={any}\n", .{beamcmd}); const mock_network = beamcmd.mockNetwork; @@ -465,6 +459,9 @@ fn mainInner() !void { const registry_1 = shared_registry; const registry_2 = shared_registry; + var api_server_handle: ?api_server.APIServerHandle = null; + defer if (api_server_handle) |*handle| handle.stop(); + var beam_node_1: BeamNode = undefined; try beam_node_1.init(allocator, .{ // options @@ -480,6 +477,11 @@ fn mainInner() !void { .node_registry = registry_1, }); + api_server_handle = api_server.startAPIServer(allocator, beamcmd.metricsPort, &beam_node_1.chain.forkChoice) catch |err| { + ErrorHandler.logErrorWithDetails(err, "start API server", .{ .port = beamcmd.metricsPort }); + return err; + }; + var beam_node_2: BeamNode = undefined; try beam_node_2.init(allocator, .{ // options diff --git a/pkgs/cli/src/node.zig b/pkgs/cli/src/node.zig index 259b70be..fa648035 100644 --- a/pkgs/cli/src/node.zig +++ b/pkgs/cli/src/node.zig @@ -119,6 +119,7 @@ pub const Node = struct { logger: zeam_utils.ModuleLogger, db: database.Db, key_manager: key_manager_lib.KeyManager, + api_server_handle: ?api_server.APIServerHandle, const Self = @This(); @@ -129,13 +130,10 @@ pub const Node = struct { ) !void { self.allocator = allocator; self.options = options; - - // Initialize event broadcaster - try event_broadcaster.initGlobalBroadcaster(allocator); + self.api_server_handle = null; if (options.metrics_enable) { try api.init(allocator); - try api_server.startAPIServer(allocator, options.metrics_port); } // some base mainnet spec would be loaded to build this up @@ -202,10 +200,17 @@ pub const Node = struct { .node_registry = options.node_registry, }); + if (options.metrics_enable) { + self.api_server_handle = try api_server.startAPIServer(allocator, options.metrics_port, &self.beam_node.chain.forkChoice); + } + self.logger = options.logger_config.logger(.node); } pub fn deinit(self: *Self) void { + if (self.api_server_handle) |*handle| { + handle.stop(); + } self.clock.deinit(self.allocator); self.beam_node.deinit(); self.key_manager.deinit(); diff --git a/pkgs/node/src/chain.zig b/pkgs/node/src/chain.zig index 4ca82029..95bef205 100644 --- a/pkgs/node/src/chain.zig +++ b/pkgs/node/src/chain.zig @@ -21,6 +21,7 @@ const jsonToString = zeam_utils.jsonToString; const utils = @import("./utils.zig"); pub const fcFactory = @import("./forkchoice.zig"); const constants = @import("./constants.zig"); +const tree_visualizer = @import("./tree_visualizer.zig"); const networkFactory = @import("./network.zig"); const PeerInfo = networkFactory.PeerInfo; @@ -171,12 +172,12 @@ pub const BeamChain = struct { // interval to attest so we should put out the chain status information to the user along with // latest head which most likely should be the new block received and processed const islot: isize = @intCast(slot); - self.printSlot(islot, self.connected_peers.count()); + self.printSlot(islot, null, self.connected_peers.count()); // Periodic pruning: prune old non-canonical states every N slots // This ensures we prune even when finalization doesn't advance if (slot > 0 and slot % constants.FORKCHOICE_PRUNING_INTERVAL_SLOTS == 0) { - const finalized = self.forkChoice.fcStore.latest_finalized; + const finalized = self.forkChoice.getLatestFinalized(); // no need to work extra if finalization is not far behind if (finalized.slot + 2 * constants.FORKCHOICE_PRUNING_INTERVAL_SLOTS < slot) { self.module_logger.warn("finalization slot={d} too far behind the current slot={d}", .{ finalized.slot, slot }); @@ -324,8 +325,7 @@ pub const BeamChain = struct { pub fn constructAttestationData(self: *Self, opts: AttestationConstructionParams) !types.AttestationData { const slot = opts.slot; - // const head = try self.forkChoice.getProposalHead(slot); - const head_proto = self.forkChoice.head; + const head_proto = self.forkChoice.getHead(); const head: types.Checkpoint = .{ .root = head_proto.blockRoot, .slot = head_proto.slot, @@ -333,7 +333,7 @@ pub const BeamChain = struct { const head_str = try head.toJsonString(self.allocator); defer self.allocator.free(head_str); - const safe_target_proto = self.forkChoice.safeTarget; + const safe_target_proto = self.forkChoice.getSafeTarget(); const safe_target: types.Checkpoint = .{ .root = safe_target_proto.blockRoot, .slot = safe_target_proto.slot, @@ -357,13 +357,13 @@ pub const BeamChain = struct { .slot = slot, .head = head, .target = target, - .source = self.forkChoice.fcStore.latest_justified, + .source = self.forkChoice.getLatestJustified(), }; return attestation_data; } - pub fn printSlot(self: *Self, islot: isize, peer_count: usize) void { + pub fn printSlot(self: *Self, islot: isize, tree_depth: ?usize, peer_count: usize) void { // head should be auto updated if receieved a block or block proposal done // however it doesn't get updated unless called updatehead even though process block // logs show it has been updated. debug and fix the call below @@ -373,19 +373,30 @@ pub const BeamChain = struct { return; } else - self.forkChoice.head; + self.forkChoice.getHead(); // Get additional chain information - const justified = self.forkChoice.fcStore.latest_justified; - const finalized = self.forkChoice.fcStore.latest_finalized; + const justified = self.forkChoice.getLatestJustified(); + const finalized = self.forkChoice.getLatestFinalized(); // Calculate chain progress const slot: usize = if (islot < 0) 0 else @intCast(islot); const blocks_behind = if (slot > fc_head.slot) slot - fc_head.slot else 0; const is_timely = fc_head.timeliness; + // Build tree visualization (thread-safe snapshot) + var arena = std.heap.ArenaAllocator.init(self.allocator); + defer arena.deinit(); + const tree_visual = blk: { + const snapshot = self.forkChoice.snapshot(arena.allocator()) catch { + break :blk "Failed to get fork choice snapshot"; + }; + defer snapshot.deinit(arena.allocator()); + break :blk tree_visualizer.buildTreeVisualization(arena.allocator(), snapshot.nodes, tree_depth) catch "Tree visualization failed"; + }; + const states_count = self.states.count(); - const fc_nodes_count = self.forkChoice.protoArray.nodes.items.len; + const fc_nodes_count = self.forkChoice.getNodeCount(); self.module_logger.debug("cached states={d}, forkchoice nodes={d}", .{ states_count, fc_nodes_count }); self.module_logger.info( @@ -402,6 +413,9 @@ pub const BeamChain = struct { \\+---------------------------------------------------------------+ \\ Latest Justified: Slot {d:>6} | Root: 0x{any} \\ Latest Finalized: Slot {d:>6} | Root: 0x{any} + \\+---------------------------------------------------------------+ + \\ ForkChoice Tree: + \\{s} \\+===============================================================+ \\ , .{ @@ -417,6 +431,7 @@ pub const BeamChain = struct { std.fmt.fmtSliceHexLower(&justified.root), finalized.slot, std.fmt.fmtSliceHexLower(&finalized.root), + tree_visual, }); } @@ -634,7 +649,7 @@ pub const BeamChain = struct { pub fn onBlockFollowup(self: *Self, pruneForkchoice: bool) void { // 8. Asap emit new events via SSE (use forkchoice ProtoBlock directly) - const new_head = self.forkChoice.head; + const new_head = self.forkChoice.getHead(); if (api.events.NewHeadEvent.fromProtoBlock(self.allocator, new_head)) |head_event| { var chain_event = api.events.ChainEvent{ .new_head = head_event }; event_broadcaster.broadcastGlobalEvent(&chain_event) catch |err| { @@ -645,9 +660,8 @@ pub const BeamChain = struct { self.module_logger.warn("failed to create head event: {any}", .{err}); } - const store = self.forkChoice.fcStore; - const latest_justified = store.latest_justified; - const latest_finalized = store.latest_finalized; + const latest_justified = self.forkChoice.getLatestJustified(); + const latest_finalized = self.forkChoice.getLatestFinalized(); // 9. Asap emit justification/finalization events based on forkchoice store // Emit justification event only when slot increases beyond last emitted @@ -692,7 +706,7 @@ pub const BeamChain = struct { } const states_count_after_block = self.states.count(); - const fc_nodes_count_after_block = self.forkChoice.protoArray.nodes.items.len; + const fc_nodes_count_after_block = self.forkChoice.getNodeCount(); self.module_logger.info("completed on block followup with states_count={d} fc_nodes_count={d}", .{ states_count_after_block, fc_nodes_count_after_block, @@ -801,12 +815,11 @@ pub const BeamChain = struct { // 2. Put all newly finalized roots in DbFinalizedSlotsNamespace for (finalized_roots) |root| { - const idx = self.forkChoice.protoArray.indices.get(root) orelse return error.FinalizedBlockNotInForkChoice; - const node = self.forkChoice.protoArray.nodes.items[idx]; - batch.putFinalizedSlotIndex(database.DbFinalizedSlotsNamespace, node.slot, root); + const slot = self.forkChoice.getBlockSlot(root) orelse return error.FinalizedBlockNotInForkChoice; + batch.putFinalizedSlotIndex(database.DbFinalizedSlotsNamespace, slot, root); self.module_logger.debug("added block 0x{s} at slot {d} to finalized index", .{ std.fmt.fmtSliceHexLower(&root), - node.slot, + slot, }); } @@ -888,31 +901,27 @@ pub const BeamChain = struct { defer _ = timer.observe(); const data = attestation.data; - // 1. Validate that source, target, and head blocks exist in proto array - const source_idx = self.forkChoice.protoArray.indices.get(data.source.root) orelse { - self.module_logger.debug("attestation validation failed: unknown source block root=0x{s}", .{ + // 1. Validate that source, target, and head blocks exist in proto array (thread-safe) + const source_block = self.forkChoice.getProtoNode(data.source.root) orelse { + self.module_logger.debug("Attestation validation failed: unknown source block root=0x{s}", .{ std.fmt.fmtSliceHexLower(&data.source.root), }); return AttestationValidationError.UnknownSourceBlock; }; - const target_idx = self.forkChoice.protoArray.indices.get(data.target.root) orelse { - self.module_logger.debug("attestation validation failed: unknown target block root=0x{s}", .{ + const target_block = self.forkChoice.getProtoNode(data.target.root) orelse { + self.module_logger.debug("Attestation validation failed: unknown target block root=0x{s}", .{ std.fmt.fmtSliceHexLower(&data.target.root), }); return AttestationValidationError.UnknownTargetBlock; }; - const head_idx = self.forkChoice.protoArray.indices.get(data.head.root) orelse { - self.module_logger.debug("attestation validation failed: unknown head block root=0x{s}", .{ + const head_block = self.forkChoice.getProtoNode(data.head.root) orelse { + self.module_logger.debug("Attestation validation failed: unknown head block root=0x{s}", .{ std.fmt.fmtSliceHexLower(&data.head.root), }); return AttestationValidationError.UnknownHeadBlock; }; - - const source_block = self.forkChoice.protoArray.nodes.items[source_idx]; - const target_block = self.forkChoice.protoArray.nodes.items[target_idx]; - const head_block = self.forkChoice.protoArray.nodes.items[head_idx]; _ = head_block; // Will be used in future validations // 2. Validate slot relationships @@ -956,7 +965,7 @@ pub const BeamChain = struct { // Gossip attestations must be for current or past slots only. Validators attest // in interval 1 of the current slot, so they cannot attest for future slots. // Block attestations can be more lenient since the block itself was validated. - const current_slot = self.forkChoice.fcStore.timeSlots; + const current_slot = self.forkChoice.getCurrentSlot(); const max_allowed_slot = if (is_from_block) current_slot + constants.MAX_FUTURE_SLOT_TOLERANCE // Block attestations: allow +1 else @@ -992,8 +1001,8 @@ pub const BeamChain = struct { } pub fn getStatus(self: *Self) types.Status { - const finalized = self.forkChoice.fcStore.latest_finalized; - const head = self.forkChoice.head; + const finalized = self.forkChoice.getLatestFinalized(); + const head = self.forkChoice.getHead(); return .{ .finalized_root = finalized.root, @@ -1064,9 +1073,9 @@ test "process and add mock blocks into a node's chain" { var beam_chain = try BeamChain.init(allocator, ChainOpts{ .config = chain_config, .anchorState = &beam_state, .nodeId = nodeId, .logger_config = &zeam_logger_config, .db = db, .node_registry = test_registry }, connected_peers); defer beam_chain.deinit(); - try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.fcStore.latest_finalized.root, &mock_chain.blockRoots[0])); + try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.getLatestFinalized().root, &mock_chain.blockRoots[0])); try std.testing.expect(beam_chain.forkChoice.protoArray.nodes.items.len == 1); - try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.fcStore.latest_finalized.root, &beam_chain.forkChoice.protoArray.nodes.items[0].blockRoot)); + try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.getLatestFinalized().root, &beam_chain.forkChoice.protoArray.nodes.items[0].blockRoot)); try std.testing.expect(std.mem.eql(u8, mock_chain.blocks[0].message.block.state_root[0..], &beam_chain.forkChoice.protoArray.nodes.items[0].stateRoot)); try std.testing.expect(std.mem.eql(u8, &mock_chain.blockRoots[0], &beam_chain.forkChoice.protoArray.nodes.items[0].blockRoot)); @@ -1093,9 +1102,9 @@ test "process and add mock blocks into a node's chain" { try std.testing.expect(std.mem.eql(u8, &state_root, &block.state_root)); // fcstore checkpoints should match - try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.fcStore.latest_justified.root, &mock_chain.latestJustified[i].root)); - try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.fcStore.latest_finalized.root, &mock_chain.latestFinalized[i].root)); - try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.head.blockRoot, &mock_chain.latestHead[i].root)); + try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.getLatestJustified().root, &mock_chain.latestJustified[i].root)); + try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.getLatestFinalized().root, &mock_chain.latestFinalized[i].root)); + try std.testing.expect(std.mem.eql(u8, &beam_chain.forkChoice.getHead().blockRoot, &mock_chain.latestHead[i].root)); } const num_validators: usize = @intCast(mock_chain.genesis_config.numValidators()); @@ -1163,16 +1172,16 @@ test "printSlot output demonstration" { // Test printSlot at different slots to see the output std.debug.print("\n=== PRINTING CHAIN STATUS AT SLOT 0 ===\n", .{}); - beam_chain.printSlot(0, beam_chain.connected_peers.count()); + beam_chain.printSlot(0, null, beam_chain.connected_peers.count()); std.debug.print("\n=== PRINTING CHAIN STATUS AT SLOT 1 ===\n", .{}); - beam_chain.printSlot(1, beam_chain.connected_peers.count()); + beam_chain.printSlot(1, null, beam_chain.connected_peers.count()); std.debug.print("\n=== PRINTING CHAIN STATUS AT SLOT 2 ===\n", .{}); - beam_chain.printSlot(2, beam_chain.connected_peers.count()); + beam_chain.printSlot(2, null, beam_chain.connected_peers.count()); std.debug.print("\n=== PRINTING CHAIN STATUS AT SLOT 5 (BEHIND) ===\n", .{}); - beam_chain.printSlot(5, beam_chain.connected_peers.count()); + beam_chain.printSlot(5, null, beam_chain.connected_peers.count()); // Verify that the chain state is as expected try std.testing.expect(beam_chain.forkChoice.protoArray.nodes.items.len == mock_chain.blocks.len); diff --git a/pkgs/node/src/forkchoice.zig b/pkgs/node/src/forkchoice.zig index e0596afc..fb9c2b07 100644 --- a/pkgs/node/src/forkchoice.zig +++ b/pkgs/node/src/forkchoice.zig @@ -1,6 +1,7 @@ const std = @import("std"); const json = std.json; const Allocator = std.mem.Allocator; +const Thread = std.Thread; const ssz = @import("ssz"); const types = @import("@zeam/types"); @@ -85,7 +86,8 @@ pub const ProtoArray = struct { } } - pub fn applyDeltas(self: *Self, deltas: []isize, cutoff_weight: u64) !void { + // Internal unlocked version - assumes caller holds lock + fn applyDeltasUnlocked(self: *Self, deltas: []isize, cutoff_weight: u64) !void { if (deltas.len != self.nodes.items.len) { return ForkChoiceError.InvalidDeltas; } @@ -220,8 +222,22 @@ pub const ForkChoice = struct { // get added deltas: std.ArrayList(isize), logger: zeam_utils.ModuleLogger, + // Thread-safe access protection + mutex: Thread.RwLock, const Self = @This(); + + /// Thread-safe snapshot for observability + pub const Snapshot = struct { + head: ProtoNode, + latest_justified_root: [32]u8, + latest_finalized_root: [32]u8, + nodes: []ProtoNode, + + pub fn deinit(self: Snapshot, allocator: Allocator) void { + allocator.free(self.nodes); + } + }; pub fn init(allocator: Allocator, opts: ForkChoiceParams) !Self { const anchor_block_header = try opts.anchorState.genStateBlockHeader(allocator); var anchor_block_root: [32]u8 = undefined; @@ -262,11 +278,54 @@ pub const ForkChoice = struct { .safeTarget = anchor_block, .deltas = deltas, .logger = opts.logger, + .mutex = Thread.RwLock{}, }; - _ = try fc.updateHead(); + // No lock needed during init - struct not yet accessible to other threads + _ = try fc.updateHeadUnlocked(); return fc; } + /// Thread-safe snapshot for observability + /// Holds shared lock only during copy, caller formats JSON lock-free + pub fn snapshot(self: *Self, allocator: Allocator) !Snapshot { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + + // Quick copy - ProtoNode has no pointer members, shallow copy is safe + const nodes_copy = try allocator.alloc(ProtoNode, self.protoArray.nodes.items.len); + @memcpy(nodes_copy, self.protoArray.nodes.items); + + // Get the full ProtoNode for head from protoArray + const head_idx = self.protoArray.indices.get(self.head.blockRoot) orelse { + // Fallback: create a ProtoNode from ProtoBlock if not found + const head_node = ProtoNode{ + .slot = self.head.slot, + .blockRoot = self.head.blockRoot, + .parentRoot = self.head.parentRoot, + .stateRoot = self.head.stateRoot, + .timeliness = self.head.timeliness, + .confirmed = self.head.confirmed, + .parent = null, + .weight = 0, + .bestChild = null, + .bestDescendant = null, + }; + return Snapshot{ + .head = head_node, + .latest_justified_root = self.fcStore.latest_justified.root, + .latest_finalized_root = self.fcStore.latest_finalized.root, + .nodes = nodes_copy, + }; + }; + + return Snapshot{ + .head = self.protoArray.nodes.items[head_idx], + .latest_justified_root = self.fcStore.latest_justified.root, + .latest_finalized_root = self.fcStore.latest_finalized.root, + .nodes = nodes_copy, + }; + } + fn isBlockTimely(self: *Self, blockDelayMs: usize) bool { _ = self; _ = blockDelayMs; @@ -301,7 +360,8 @@ pub const ForkChoice = struct { /// Builds a canonical view hashmap containing all blocks in the canonical chain /// from targetAnchor back to prevAnchor, plus all their unfinalized descendants. - pub fn getCanonicalView(self: *Self, canonical_view: *std.AutoHashMap(types.Root, void), targetAnchorRoot: types.Root, prevAnchorRootOrNull: ?types.Root) !void { + // Internal unlocked version - assumes caller holds lock + fn getCanonicalViewUnlocked(self: *Self, canonical_view: *std.AutoHashMap(types.Root, void), targetAnchorRoot: types.Root, prevAnchorRootOrNull: ?types.Root) !void { const prev_anchor_idx = if (prevAnchorRootOrNull) |prevAnchorRoot| (self.protoArray.indices.get(prevAnchorRoot) orelse return ForkChoiceError.InvalidAnchor) else 0; const target_anchor_idx = self.protoArray.indices.get(targetAnchorRoot) orelse return ForkChoiceError.InvalidTargetAnchor; @@ -346,7 +406,8 @@ pub const ForkChoice = struct { /// - non_canonical_roots: Blocks not in the canonical set (orphans) /// /// If canonicalViewOrNull is provided, it reuses an existing canonical view for efficiency. - pub fn getCanonicalityAnalysis(self: *Self, targetAnchorRoot: types.Root, prevAnchorRootOrNull: ?types.Root, canonicalViewOrNull: ?*std.AutoHashMap(types.Root, void)) ![3][]types.Root { + // Internal unlocked version - assumes caller holds lock + fn getCanonicalityAnalysisUnlocked(self: *Self, targetAnchorRoot: types.Root, prevAnchorRootOrNull: ?types.Root, canonicalViewOrNull: ?*std.AutoHashMap(types.Root, void)) ![3][]types.Root { var canonical_roots = std.ArrayList(types.Root).init(self.allocator); var potential_canonical_roots = std.ArrayList(types.Root).init(self.allocator); var non_canonical_roots = std.ArrayList(types.Root).init(self.allocator); @@ -359,7 +420,7 @@ pub const ForkChoice = struct { // get all canonical view of the chain finalized and unfinalized anchored at the targetAnchorRoot var canonical_blocks = canonicalViewOrNull orelse blk: { var local_view = std.AutoHashMap(types.Root, void).init(self.allocator); - try self.getCanonicalView(&local_view, targetAnchorRoot, prevAnchorRootOrNull); + try self.getCanonicalViewUnlocked(&local_view, targetAnchorRoot, prevAnchorRootOrNull); break :blk &local_view; }; @@ -410,13 +471,14 @@ pub const ForkChoice = struct { } /// Rebases the forkchoice tree to a new anchor, pruning non-canonical blocks. - pub fn rebase(self: *Self, targetAnchorRoot: types.Root, canonicalViewOrNull: ?*std.AutoHashMap(types.Root, void)) !void { + // Internal unlocked version - assumes caller holds lock + fn rebaseUnlocked(self: *Self, targetAnchorRoot: types.Root, canonicalViewOrNull: ?*std.AutoHashMap(types.Root, void)) !void { const target_anchor_idx = self.protoArray.indices.get(targetAnchorRoot) orelse return ForkChoiceError.InvalidTargetAnchor; const target_anchor_slot = self.protoArray.nodes.items[target_anchor_idx].slot; var canonical_view = canonicalViewOrNull orelse blk: { var local_view = std.AutoHashMap(types.Root, void).init(self.allocator); - try self.getCanonicalView(&local_view, targetAnchorRoot, null); + try self.getCanonicalViewUnlocked(&local_view, targetAnchorRoot, null); break :blk &local_view; }; @@ -532,7 +594,8 @@ pub const ForkChoice = struct { /// Depth 0 returns the head itself. Traverses parent pointers (not slot arithmetic), /// so missed slots don't affect depth counting. If depth exceeds chain length, /// clamps to genesis. - pub fn getCanonicalAncestorAtDepth(self: *Self, min_depth: usize) !ProtoBlock { + // Internal unlocked version - assumes caller holds lock + fn getCanonicalAncestorAtDepthUnlocked(self: *Self, min_depth: usize) !ProtoBlock { var depth = min_depth; var current_idx = self.protoArray.indices.get(self.head.blockRoot) orelse return ForkChoiceError.InvalidHeadIndex; @@ -554,7 +617,8 @@ pub const ForkChoice = struct { return ancestor_at_depth; } - pub fn tickInterval(self: *Self, hasProposal: bool) !void { + // Internal unlocked version - assumes caller holds lock + fn tickIntervalUnlocked(self: *Self, hasProposal: bool) !void { self.fcStore.time += 1; const currentInterval = self.fcStore.time % constants.INTERVALS_PER_SLOT; @@ -562,28 +626,30 @@ pub const ForkChoice = struct { 0 => { self.fcStore.timeSlots += 1; if (hasProposal) { - _ = try self.acceptNewAttestations(); + _ = try self.acceptNewAttestationsUnlocked(); } }, 1 => {}, 2 => { - _ = try self.updateSafeTarget(); + _ = try self.updateSafeTargetUnlocked(); }, 3 => { - _ = try self.acceptNewAttestations(); + _ = try self.acceptNewAttestationsUnlocked(); }, else => @panic("invalid interval"), } self.logger.debug("forkchoice ticked to time(intervals)={d} slot={d}", .{ self.fcStore.time, self.fcStore.timeSlots }); } - pub fn onInterval(self: *Self, time_intervals: usize, has_proposal: bool) !void { + // Internal unlocked version - assumes caller holds lock + fn onIntervalUnlocked(self: *Self, time_intervals: usize, has_proposal: bool) !void { while (self.fcStore.time < time_intervals) { - try self.tickInterval(has_proposal and (self.fcStore.time + 1) == time_intervals); + try self.tickIntervalUnlocked(has_proposal and (self.fcStore.time + 1) == time_intervals); } } - pub fn acceptNewAttestations(self: *Self) !ProtoBlock { + // Internal unlocked version - assumes caller holds lock + fn acceptNewAttestationsUnlocked(self: *Self) !ProtoBlock { for (0..self.config.genesis.numValidators()) |validator_id| { var attestation_tracker = self.attestations.get(validator_id) orelse AttestationTracker{}; if (attestation_tracker.latestNew) |new_attestation| { @@ -595,27 +661,11 @@ pub const ForkChoice = struct { try self.attestations.put(validator_id, attestation_tracker); } - return self.updateHead(); + return self.updateHeadUnlocked(); } - pub fn getProposalHead(self: *Self, slot: types.Slot) !types.Checkpoint { - const time_intervals = slot * constants.INTERVALS_PER_SLOT; - // this could be called independently by the validator when its a separate process - // and FC would need to be protected by mutex to make it thread safe but for now - // this is deterministally called after the fc has been ticked ahead - // so the following call should be a no-op - try self.onInterval(time_intervals, true); - // accept any new attestations in case previous ontick was a no-op and either the validator - // wasn't registered or there have been new attestations - const head = try self.acceptNewAttestations(); - - return types.Checkpoint{ - .root = head.blockRoot, - .slot = head.slot, - }; - } - - pub fn getProposalAttestations(self: *Self) ![]types.SignedAttestation { + // Internal unlocked version - assumes caller holds lock + fn getProposalAttestationsUnlocked(self: *Self) ![]types.SignedAttestation { var included_attestations = std.ArrayList(types.SignedAttestation).init(self.allocator); const latest_justified = self.fcStore.latest_justified; @@ -636,7 +686,8 @@ pub const ForkChoice = struct { return included_attestations.toOwnedSlice(); } - pub fn getAttestationTarget(self: *Self) !types.Checkpoint { + // Internal unlocked version - assumes caller holds lock + fn getAttestationTargetUnlocked(self: *Self) !types.Checkpoint { var target_idx = self.protoArray.indices.get(self.head.blockRoot) orelse return ForkChoiceError.InvalidHeadIndex; const nodes = self.protoArray.nodes.items; @@ -657,7 +708,8 @@ pub const ForkChoice = struct { }; } - pub fn computeDeltas(self: *Self, from_known: bool) ![]isize { + // Internal unlocked version - assumes caller holds lock + fn computeDeltasUnlocked(self: *Self, from_known: bool) ![]isize { // prep the deltas data structure while (self.deltas.items.len < self.protoArray.nodes.items.len) { try self.deltas.append(0); @@ -690,9 +742,10 @@ pub const ForkChoice = struct { return self.deltas.items; } - pub fn computeFCHead(self: *Self, from_known: bool, cutoff_weight: u64) !ProtoBlock { - const deltas = try self.computeDeltas(from_known); - try self.protoArray.applyDeltas(deltas, cutoff_weight); + // Internal unlocked version - assumes caller holds lock + fn computeFCHeadUnlocked(self: *Self, from_known: bool, cutoff_weight: u64) !ProtoBlock { + const deltas = try self.computeDeltasUnlocked(from_known); + try self.protoArray.applyDeltasUnlocked(deltas, cutoff_weight); // head is the best descendant of latest justified const justified_idx = self.protoArray.indices.get(self.fcStore.latest_justified.root) orelse return ForkChoiceError.InvalidJustifiedRoot; @@ -715,20 +768,23 @@ pub const ForkChoice = struct { return fcHead; } - pub fn updateHead(self: *Self) !ProtoBlock { - self.head = try self.computeFCHead(true, 0); + // Internal unlocked version - assumes caller holds lock + fn updateHeadUnlocked(self: *Self) !ProtoBlock { + self.head = try self.computeFCHeadUnlocked(true, 0); // Update the lean_head_slot metric zeam_metrics.metrics.lean_head_slot.set(self.head.slot); return self.head; } - pub fn updateSafeTarget(self: *Self) !ProtoBlock { + // Internal unlocked version - assumes caller holds lock + fn updateSafeTargetUnlocked(self: *Self) !ProtoBlock { const cutoff_weight = try std.math.divCeil(u64, 2 * self.config.genesis.numValidators(), 3); - self.safeTarget = try self.computeFCHead(false, cutoff_weight); + self.safeTarget = try self.computeFCHeadUnlocked(false, cutoff_weight); return self.safeTarget; } - pub fn onAttestation(self: *Self, signed_attestation: types.SignedAttestation, is_from_block: bool) !void { + // Internal unlocked version - assumes caller holds lock + fn onAttestationUnlocked(self: *Self, signed_attestation: types.SignedAttestation, is_from_block: bool) !void { // Attestation validation is done by the caller (chain layer) // This function assumes the attestation has already been validated @@ -778,11 +834,12 @@ pub const ForkChoice = struct { } // we process state outside forkchoice onblock to parallize verifications and just use the post state here - pub fn onBlock(self: *Self, block: types.BeamBlock, state: *const types.BeamState, opts: OnBlockOpts) !ProtoBlock { + // Internal unlocked version - assumes caller holds lock + fn onBlockUnlocked(self: *Self, block: types.BeamBlock, state: *const types.BeamState, opts: OnBlockOpts) !ProtoBlock { const parent_root = block.parent_root; const slot = block.slot; - const parent_block_or_null = self.getBlock(parent_root); + const parent_block_or_null = self.getBlockUnlocked(parent_root); if (parent_block_or_null) |parent_block| { // we will use parent block later as per the finalization gadget _ = parent_block; @@ -825,7 +882,8 @@ pub const ForkChoice = struct { } } - pub fn confirmBlock(self: *Self, blockRoot: types.Root) !void { + // Internal unlocked version - assumes caller holds lock + fn confirmBlockUnlocked(self: *Self, blockRoot: types.Root) !void { if (self.protoArray.indices.get(blockRoot)) |block_idx| { self.protoArray.nodes.items[block_idx].confirmed = true; } else { @@ -833,17 +891,8 @@ pub const ForkChoice = struct { } } - pub fn hasBlock(self: *Self, blockRoot: types.Root) bool { - const block_or_null = self.getBlock(blockRoot); - // we can only say we have the block if its fully confirmed to be imported - if (block_or_null) |block| { - return (block.confirmed == true); - } - - return false; - } - - pub fn getBlock(self: *Self, blockRoot: types.Root) ?ProtoBlock { + // Internal unlocked version - assumes caller holds lock + fn getBlockUnlocked(self: *Self, blockRoot: types.Root) ?ProtoBlock { const nodeOrNull = self.protoArray.getNode(blockRoot); if (nodeOrNull) |node| { // TODO cast doesn't seem to be working find resolution @@ -861,6 +910,173 @@ pub const ForkChoice = struct { return null; } } + + // Internal unlocked version - assumes caller holds lock + fn hasBlockUnlocked(self: *Self, blockRoot: types.Root) bool { + return self.protoArray.indices.contains(blockRoot); + } + + // PUBLIC API - LOCK AT BOUNDARY + // These methods acquire locks and delegate to unlocked helpers + + pub fn updateHead(self: *Self) !ProtoBlock { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.updateHeadUnlocked(); + } + + pub fn onBlock(self: *Self, block: types.BeamBlock, state: *const types.BeamState, opts: OnBlockOpts) !ProtoBlock { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.onBlockUnlocked(block, state, opts); + } + + pub fn onInterval(self: *Self, time_intervals: usize, has_proposal: bool) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.onIntervalUnlocked(time_intervals, has_proposal); + } + + pub fn onAttestation(self: *Self, signed_attestation: types.SignedAttestation, is_from_block: bool) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.onAttestationUnlocked(signed_attestation, is_from_block); + } + + pub fn updateSafeTarget(self: *Self) !ProtoBlock { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.updateSafeTargetUnlocked(); + } + + // READ-ONLY API - SHARED LOCK + + pub fn getProposalAttestations(self: *Self) ![]types.SignedAttestation { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.getProposalAttestationsUnlocked(); + } + + pub fn getAttestationTarget(self: *Self) !types.Checkpoint { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.getAttestationTargetUnlocked(); + } + + pub fn hasBlock(self: *Self, blockRoot: types.Root) bool { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.hasBlockUnlocked(blockRoot); + } + + pub fn getBlock(self: *Self, blockRoot: types.Root) ?ProtoBlock { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.getBlockUnlocked(blockRoot); + } + + pub fn getCanonicalView(self: *Self, canonical_view: *std.AutoHashMap(types.Root, void), targetAnchorRoot: types.Root, prevAnchorRootOrNull: ?types.Root) !void { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.getCanonicalViewUnlocked(canonical_view, targetAnchorRoot, prevAnchorRootOrNull); + } + + pub fn getCanonicalityAnalysis(self: *Self, targetAnchorRoot: types.Root, prevAnchorRootOrNull: ?types.Root, canonicalViewOrNull: ?*std.AutoHashMap(types.Root, void)) ![3][]types.Root { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.getCanonicalityAnalysisUnlocked(targetAnchorRoot, prevAnchorRootOrNull, canonicalViewOrNull); + } + + pub fn rebase(self: *Self, targetAnchorRoot: types.Root, canonicalViewOrNull: ?*std.AutoHashMap(types.Root, void)) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.rebaseUnlocked(targetAnchorRoot, canonicalViewOrNull); + } + + pub fn getCanonicalAncestorAtDepth(self: *Self, min_depth: usize) !ProtoBlock { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.getCanonicalAncestorAtDepthUnlocked(min_depth); + } + + pub fn confirmBlock(self: *Self, blockRoot: types.Root) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.confirmBlockUnlocked(blockRoot); + } + + pub fn computeDeltas(self: *Self, from_known: bool) ![]isize { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.computeDeltasUnlocked(from_known); + } + + pub fn applyDeltas(self: *Self, deltas: []isize, cutoff_weight: u64) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + return self.protoArray.applyDeltasUnlocked(deltas, cutoff_weight); + } + + // SAFE GETTERS FOR SHARED STATE + // These provide thread-safe access to internal state + + /// Get a copy of the current head block + pub fn getHead(self: *Self) ProtoBlock { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.head; + } + + /// Get the current safe target block (thread-safe) + pub fn getSafeTarget(self: *Self) ProtoBlock { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.safeTarget; + } + + /// Get the latest justified checkpoint + pub fn getLatestJustified(self: *Self) types.Checkpoint { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.fcStore.latest_justified; + } + + /// Get the latest finalized checkpoint + pub fn getLatestFinalized(self: *Self) types.Checkpoint { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.fcStore.latest_finalized; + } + + /// Get the current time in slots + pub fn getCurrentSlot(self: *Self) types.Slot { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.fcStore.timeSlots; + } + + /// Check if a block exists and get its slot (thread-safe) + pub fn getBlockSlot(self: *Self, blockRoot: types.Root) ?types.Slot { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + const idx = self.protoArray.indices.get(blockRoot) orelse return null; + return self.protoArray.nodes.items[idx].slot; + } + + /// Get a ProtoNode by root (returns a copy) + pub fn getProtoNode(self: *Self, blockRoot: types.Root) ?ProtoNode { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + const idx = self.protoArray.indices.get(blockRoot) orelse return null; + return self.protoArray.nodes.items[idx]; + } + + /// Get the current number of nodes in the forkchoice tree + pub fn getNodeCount(self: *Self) usize { + self.mutex.lockShared(); + defer self.mutex.unlockShared(); + return self.protoArray.nodes.items.len; + } }; pub const ForkChoiceError = error{ @@ -1065,6 +1281,7 @@ test "getCanonicalAncestorAtDepth and getCanonicalityAnalysis" { .safeTarget = createTestProtoBlock(8, 0xFF, 0xEE), .deltas = std.ArrayList(isize).init(allocator), .logger = module_logger, + .mutex = Thread.RwLock{}, }; defer fork_choice.attestations.deinit(); defer fork_choice.deltas.deinit(); @@ -1379,6 +1596,7 @@ fn buildTestTreeWithMockChain(allocator: Allocator, mock_chain: anytype) !struct .safeTarget = createTestProtoBlock(8, 0xFF, 0xEE), .deltas = std.ArrayList(isize).init(allocator), .logger = module_logger, + .mutex = Thread.RwLock{}, }; return .{ @@ -1572,7 +1790,7 @@ test "rebase: bestChild and bestDescendant remapping" { // Apply deltas to establish weights and bestChild/bestDescendant const deltas = try ctx.fork_choice.computeDeltas(true); - try ctx.fork_choice.protoArray.applyDeltas(deltas, 0); + try ctx.fork_choice.applyDeltas(deltas, 0); // Verify pre-rebase bestChild/bestDescendant // C(2) should have bestChild=3(D) since D branch has all 4 votes @@ -1672,7 +1890,7 @@ test "rebase: weight preservation after rebase" { // Apply deltas to establish weights const deltas = try ctx.fork_choice.computeDeltas(true); - try ctx.fork_choice.protoArray.applyDeltas(deltas, 0); + try ctx.fork_choice.applyDeltas(deltas, 0); // Record pre-rebase weights for nodes that will remain const pre_rebase_weight_C = ctx.fork_choice.protoArray.nodes.items[2].weight; // C @@ -2327,6 +2545,7 @@ test "rebase: heavy attestation load - all validators tracked correctly" { .safeTarget = createTestProtoBlock(3, 0xDD, 0xCC), .deltas = std.ArrayList(isize).init(allocator), .logger = module_logger, + .mutex = Thread.RwLock{}, }; // Note: We don't defer proto_array.nodes/indices.deinit() here because they're // moved into fork_choice and will be deinitialized separately diff --git a/pkgs/node/src/lib.zig b/pkgs/node/src/lib.zig index bdca6c12..88a914bd 100644 --- a/pkgs/node/src/lib.zig +++ b/pkgs/node/src/lib.zig @@ -4,6 +4,9 @@ pub const Clock = clockFactory.Clock; const nodeFactory = @import("./node.zig"); pub const BeamNode = nodeFactory.BeamNode; +const chainFactory = @import("./chain.zig"); +pub const BeamChain = chainFactory.BeamChain; + pub const fcFactory = @import("./forkchoice.zig"); pub const constants = @import("./constants.zig"); diff --git a/pkgs/node/src/node.zig b/pkgs/node/src/node.zig index 49676a70..233fb002 100644 --- a/pkgs/node/src/node.zig +++ b/pkgs/node/src/node.zig @@ -737,7 +737,7 @@ pub const BeamNode = struct { const interval = @mod(itime_intervals, constants.INTERVALS_PER_SLOT); if (interval == 1) { - self.chain.printSlot(islot, self.network.getPeerCount()); + self.chain.printSlot(islot, null, self.network.getPeerCount()); } return; } diff --git a/pkgs/node/src/tree_visualizer.zig b/pkgs/node/src/tree_visualizer.zig new file mode 100644 index 00000000..e02e6893 --- /dev/null +++ b/pkgs/node/src/tree_visualizer.zig @@ -0,0 +1,108 @@ +const std = @import("std"); +const Allocator = std.mem.Allocator; +const fcFactory = @import("./forkchoice.zig"); + +/// Builds a tree visualization of the fork choice tree with optional depth limit +pub fn buildTreeVisualization(allocator: Allocator, nodes: []const fcFactory.ProtoNode, max_depth: ?usize) ![]const u8 { + var tree_lines = std.ArrayListUnmanaged(u8){}; + defer tree_lines.deinit(allocator); + + // Find root nodes (nodes with no parent) + var root_indices = std.ArrayList(usize).init(allocator); + defer root_indices.deinit(); + + for (nodes, 0..) |node, i| { + if (node.parent == null) { + try root_indices.append(i); + } + } + + // Build tree visualization starting from roots + for (root_indices.items) |root_idx| { + try visualizeTreeBranch(allocator, &tree_lines, nodes, root_idx, 0, "", max_depth); + } + + return tree_lines.toOwnedSlice(allocator); +} + +/// Recursively builds a tree branch visualization +fn visualizeTreeBranch(allocator: Allocator, tree_lines: *std.ArrayListUnmanaged(u8), nodes: []const fcFactory.ProtoNode, node_idx: usize, depth: usize, prefix: []const u8, max_depth: ?usize) !void { + const node = nodes[node_idx]; + const hex_root = try std.fmt.allocPrint(allocator, "{s}", .{std.fmt.fmtSliceHexLower(node.blockRoot[0..2])}); + defer allocator.free(hex_root); + + const node_line = try std.fmt.allocPrint(allocator, "{s}{s}({d})", .{ prefix, hex_root, node.slot }); + defer allocator.free(node_line); + + try tree_lines.appendSlice(allocator, node_line); + + // Check if we've reached the maximum depth + if (max_depth) |max| { + if (depth >= max) { + const truncated_comment = try std.fmt.allocPrint(allocator, " ... (truncated at depth {d})", .{max}); + defer allocator.free(truncated_comment); + try tree_lines.appendSlice(allocator, truncated_comment); + try tree_lines.append(allocator, '\n'); + return; + } + } + + var children = std.ArrayList(usize).init(allocator); + defer children.deinit(); + + for (nodes, 0..) |child_node, i| { + if (child_node.parent) |parent_idx| { + if (parent_idx == node_idx) { + try children.append(i); + } + } + } + + if (children.items.len > 0) { + const child_count_comment = try std.fmt.allocPrint(allocator, " has {d} child branch{s}", .{ children.items.len, if (children.items.len == 1) "" else "es" }); + defer allocator.free(child_count_comment); + try tree_lines.appendSlice(allocator, child_count_comment); + } + try tree_lines.append(allocator, '\n'); + + for (children.items, 0..) |child_idx, child_i| { + const child_node = nodes[child_idx]; + const is_last_child = child_i == children.items.len - 1; + + const indent = try createTreeIndent(allocator, depth, is_last_child); + defer allocator.free(indent); + + // Check for missing slots between parent and child + if (child_node.slot > node.slot + 1) { + const missing_slots = child_node.slot - node.slot - 1; + const missing_line = if (missing_slots == 1) + try std.fmt.allocPrint(allocator, "{s}[{d}] ─┘ ", .{ indent, node.slot + 1 }) + else + try std.fmt.allocPrint(allocator, "{s}[{d}..{d}] ─┘ ", .{ indent, node.slot + 1, child_node.slot - 1 }); + defer allocator.free(missing_line); + try tree_lines.appendSlice(allocator, missing_line); + } else { + try tree_lines.appendSlice(allocator, indent); + } + + // Recursively process child + try visualizeTreeBranch(allocator, tree_lines, nodes, child_idx, depth + 1, "", max_depth); + } +} + +/// Helper function to create proper tree indentation +fn createTreeIndent(allocator: Allocator, depth: usize, is_last_child: bool) ![]const u8 { + var indent = std.ArrayList(u8).init(allocator); + defer indent.deinit(); + + // Add indentation for each depth level + for (0..depth) |_| { + try indent.appendSlice(" "); + } + + // Add tree characters based on position + const tree_char = if (is_last_child) "└── " else "├── "; + try indent.appendSlice(tree_char); + + return indent.toOwnedSlice(); +}