Skip to content
Open
1 change: 1 addition & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pub fn build(b: *Builder) !void {
});
zeam_utils.addImport("datetime", datetime);
zeam_utils.addImport("yaml", yaml);
zeam_utils.addImport("xev", xev);

// add zeam-params
const zeam_params = b.addModule("@zeam/params", .{
Expand Down
41 changes: 25 additions & 16 deletions pkgs/cli/src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const ChainConfig = configs.ChainConfig;
const Chain = configs.Chain;
const ChainOptions = configs.ChainOptions;

const utils_lib = @import("@zeam/utils");
const zeam_utils = @import("@zeam/utils");

const database = @import("@zeam/database");

Expand Down Expand Up @@ -191,14 +191,21 @@ fn mainInner() !void {

switch (opts.args.__commands__) {
.clock => {
var loop = xev.Loop.init(.{}) catch |err| {
var event_loop = zeam_utils.EventLoop.init(gpa.allocator()) catch |err| {
ErrorHandler.logErrorWithOperation(err, "initialize event loop");
return err;
};
var clock = Clock.init(gpa.allocator(), genesis, &loop) catch |err| {
defer {
event_loop.stop();
event_loop.deinit();
}
event_loop.startHandlers();

var clock = Clock.init(gpa.allocator(), genesis, &event_loop) catch |err| {
ErrorHandler.logErrorWithOperation(err, "initialize clock");
return err;
};

std.debug.print("clock {any}\n", .{clock});

clock.run() catch |err| {
Expand All @@ -208,7 +215,7 @@ fn mainInner() !void {
},
.prove => |provecmd| {
std.debug.print("distribution dir={s}\n", .{provecmd.dist_dir});
var zeam_logger_config = utils_lib.getLoggerConfig(null, null);
var zeam_logger_config = zeam_utils.getLoggerConfig(null, null);
const logger = zeam_logger_config.logger(.state_proving_manager);
const stf_logger = zeam_logger_config.logger(.state_transition);

Expand Down Expand Up @@ -307,17 +314,19 @@ fn mainInner() !void {
try anchorState.genGenesisState(gpa.allocator(), chain_config.genesis);
defer anchorState.deinit();

// TODO we seem to be needing one loop because then the events added to loop are not being fired
// in the order to which they have been added even with the an appropriate delay added
// behavior of this further needs to be investigated but for now we will share the same loop
const loop = try allocator.create(xev.Loop);
loop.* = try xev.Loop.init(.{});
var event_loop = try zeam_utils.EventLoop.init(allocator);
defer {
event_loop.stop();
event_loop.deinit();
}

event_loop.startHandlers();

try std.fs.cwd().makePath(beamcmd.data_dir);

// Create loggers first so they can be passed to network implementations
var logger1_config = utils_lib.getScopedLoggerConfig(.n1, console_log_level, utils_lib.FileBehaviourParams{ .fileActiveLevel = log_file_active_level, .filePath = beamcmd.data_dir, .fileName = log_filename, .monocolorFile = monocolor_file_log });
var logger2_config = utils_lib.getScopedLoggerConfig(.n2, console_log_level, utils_lib.FileBehaviourParams{ .fileActiveLevel = log_file_active_level, .filePath = beamcmd.data_dir, .fileName = log_filename, .monocolorFile = monocolor_file_log });
var logger1_config = zeam_utils.getScopedLoggerConfig(.n1, console_log_level, zeam_utils.FileBehaviourParams{ .fileActiveLevel = log_file_active_level, .filePath = beamcmd.data_dir, .fileName = log_filename, .monocolorFile = monocolor_file_log });
var logger2_config = zeam_utils.getScopedLoggerConfig(.n2, console_log_level, zeam_utils.FileBehaviourParams{ .fileActiveLevel = log_file_active_level, .filePath = beamcmd.data_dir, .fileName = log_filename, .monocolorFile = monocolor_file_log });

var backend1: networks.NetworkInterface = undefined;
var backend2: networks.NetworkInterface = undefined;
Expand All @@ -340,7 +349,7 @@ fn mainInner() !void {

if (mock_network) {
var network: *networks.Mock = try allocator.create(networks.Mock);
network.* = try networks.Mock.init(allocator, loop, logger1_config.logger(.network));
network.* = try networks.Mock.init(allocator, &event_loop, logger1_config.logger(.network));
backend1 = network.getNetworkInterface();
backend2 = network.getNetworkInterface();
logger1_config.logger(null).debug("--- mock gossip {any}", .{backend1.gossip});
Expand All @@ -351,7 +360,7 @@ fn mainInner() !void {
listen_addresses1 = try allocator.dupe(Multiaddr, &[_]Multiaddr{try Multiaddr.fromString(allocator, "/ip4/0.0.0.0/tcp/9001")});
const network_name1 = try allocator.dupe(u8, chain_config.spec.name);
errdefer allocator.free(network_name1);
network1.* = try networks.EthLibp2p.init(allocator, loop, .{
network1.* = try networks.EthLibp2p.init(allocator, &event_loop, .{
.networkId = 0,
.network_name = network_name1,
.local_private_key = &priv_key1,
Expand All @@ -368,7 +377,7 @@ fn mainInner() !void {
connect_peers = try allocator.dupe(Multiaddr, &[_]Multiaddr{try Multiaddr.fromString(allocator, "/ip4/127.0.0.1/tcp/9001")});
const network_name2 = try allocator.dupe(u8, chain_config.spec.name);
errdefer allocator.free(network_name2);
network2.* = try networks.EthLibp2p.init(allocator, loop, .{
network2.* = try networks.EthLibp2p.init(allocator, &event_loop, .{
.networkId = 1,
.network_name = network_name2,
.local_private_key = &priv_key2,
Expand All @@ -380,7 +389,7 @@ fn mainInner() !void {
}

var clock = try allocator.create(Clock);
clock.* = try Clock.init(allocator, chain_config.genesis.genesis_time, loop);
clock.* = try Clock.init(allocator, chain_config.genesis.genesis_time, &event_loop);

//one missing validator is by design
var validator_ids_1 = [_]usize{1};
Expand Down Expand Up @@ -459,7 +468,7 @@ fn mainInner() !void {
return err;
};

var zeam_logger_config = utils_lib.getLoggerConfig(console_log_level, utils_lib.FileBehaviourParams{ .fileActiveLevel = log_file_active_level, .filePath = leancmd.@"data-dir", .fileName = log_filename });
var zeam_logger_config = zeam_utils.getLoggerConfig(console_log_level, zeam_utils.FileBehaviourParams{ .fileActiveLevel = log_file_active_level, .filePath = leancmd.@"data-dir", .fileName = log_filename });

var start_options: node.NodeOptions = .{
.network_id = leancmd.network_id,
Expand Down
27 changes: 19 additions & 8 deletions pkgs/cli/src/node.zig
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub const NodeOptions = struct {
/// A Node that encapsulates the networking, blockchain, and validator functionalities.
/// It manages the event loop, network interface, clock, and beam node.
pub const Node = struct {
loop: xev.Loop,
event_loop: *zeam_utils.EventLoop,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just keep event_loop since it already has the loop inside?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll make event_loop the owner of loop

network: networks.EthLibp2p,
beam_node: BeamNode,
clock: Clock,
Expand Down Expand Up @@ -121,22 +121,25 @@ pub const Node = struct {
try anchorState.genGenesisState(allocator, chain_config.genesis);
errdefer anchorState.deinit();

// TODO we seem to be needing one loop because then the events added to loop are not being fired
// in the order to which they have been added even with the an appropriate delay added
// behavior of this further needs to be investigated but for now we will share the same loop
self.loop = try xev.Loop.init(.{});
self.event_loop = try allocator.create(zeam_utils.EventLoop);
errdefer allocator.destroy(self.event_loop);
self.event_loop.* = try zeam_utils.EventLoop.init(allocator);
errdefer self.event_loop.deinit();

// Start listening for async notifications from other threads
self.event_loop.startHandlers();

const addresses = try self.constructMultiaddrs();

self.network = try networks.EthLibp2p.init(allocator, &self.loop, .{
self.network = try networks.EthLibp2p.init(allocator, self.event_loop, .{
.networkId = options.network_id,
.network_name = chain_config.spec.name,
.listen_addresses = addresses.listen_addresses,
.connect_peers = addresses.connect_peers,
.local_private_key = options.local_priv_key,
}, options.logger_config.logger(.network));
errdefer self.network.deinit();
self.clock = try Clock.init(allocator, chain_config.genesis.genesis_time, &self.loop);
self.clock = try Clock.init(allocator, chain_config.genesis.genesis_time, self.event_loop);
errdefer self.clock.deinit(allocator);

var db = try database.Db.open(allocator, options.logger_config.logger(.database), options.database_path);
Expand Down Expand Up @@ -164,16 +167,24 @@ pub const Node = struct {
}

pub fn deinit(self: *Self) void {
// Stop the event loop first - this disarms async notifications
// and processes any pending work before we deinit components
self.event_loop.stop();

// Now it's safe to deinit components that use the loop
self.clock.deinit(self.allocator);
self.beam_node.deinit();
self.key_manager.deinit();
self.network.deinit();
self.enr.deinit();
self.db.deinit();
self.loop.deinit();
// Finally clean up the loop infrastructure
self.event_loop.deinit();
self.allocator.destroy(self.event_loop);
}

pub fn run(self: *Node) !void {
try self.event_loop.run(.until_done);
try self.network.run();
try self.beam_node.run();

Expand Down
9 changes: 4 additions & 5 deletions pkgs/network/src/ethlibp2p.zig
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,7 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const

zigHandler.logger.debug("\network-{d}:: !!!handleMsgFromRustBridge topic={s}:: message={s} from bytes={any} \n", .{ zigHandler.params.networkId, std.mem.span(topic_str), message_str, message_bytes });

// TODO: figure out why scheduling on the loop is not working
zigHandler.gossipHandler.onGossip(&message, false) catch |e| {
zigHandler.gossipHandler.onGossip(&message) catch |e| {
zigHandler.logger.err("onGossip handling of message failed with error e={any}", .{e});
};
}
Expand Down Expand Up @@ -750,14 +749,14 @@ pub const EthLibp2p = struct {

pub fn init(
allocator: Allocator,
loop: *xev.Loop,
event_loop: *zeam_utils.EventLoop,
params: EthLibp2pParams,
logger: zeam_utils.ModuleLogger,
) !Self {
const owned_network_name = try allocator.dupe(u8, params.network_name);
errdefer allocator.free(owned_network_name);

const gossip_handler = try interface.GenericGossipHandler.init(allocator, loop, params.networkId, logger);
const gossip_handler = try interface.GenericGossipHandler.init(allocator, event_loop, params.networkId, logger);
errdefer gossip_handler.deinit();

const peer_event_handler = try interface.PeerEventHandler.init(allocator, params.networkId, logger);
Expand Down Expand Up @@ -856,7 +855,7 @@ pub const EthLibp2p = struct {

pub fn onGossip(ptr: *anyopaque, data: *const interface.GossipMessage) anyerror!void {
const self: *Self = @ptrCast(@alignCast(ptr));
return self.gossipHandler.onGossip(data, false);
return self.gossipHandler.onGossip(data);
}

pub fn sendRPCRequest(
Expand Down
Loading
Loading