Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkgs/cli/src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ fn mainInner() !void {
backend3 = network.getNetworkInterface();
logger1_config.logger(null).debug("--- mock gossip {any}", .{backend1.gossip});
} else {
const gossip_topics1 = try node_lib.buildGossipTopicSpecs(allocator, chain_config, null, false);
errdefer allocator.free(gossip_topics1);

network1 = try allocator.create(networks.EthLibp2p);
const key_pair1 = enr_lib.KeyPair.generate();
const priv_key1 = key_pair1.v4.toString();
Expand All @@ -455,10 +458,14 @@ fn mainInner() !void {
.listen_addresses = listen_addresses1,
.connect_peers = null,
.node_registry = test_registry1,
.gossip_topics = gossip_topics1,
}, logger1_config.logger(.network));
backend1 = network1.getNetworkInterface();

// init a new lib2p network here to connect with network1
const gossip_topics2 = try node_lib.buildGossipTopicSpecs(allocator, chain_config, null, false);
errdefer allocator.free(gossip_topics2);

network2 = try allocator.create(networks.EthLibp2p);
const key_pair2 = enr_lib.KeyPair.generate();
const priv_key2 = key_pair2.v4.toString();
Expand All @@ -478,11 +485,14 @@ fn mainInner() !void {
.listen_addresses = listen_addresses2,
.connect_peers = connect_peers,
.node_registry = test_registry2,
.gossip_topics = gossip_topics2,
}, logger2_config.logger(.network));
backend2 = network2.getNetworkInterface();

// init network3 for node 3 (delayed sync node)
network3 = try allocator.create(networks.EthLibp2p);
const gossip_topics3 = try node_lib.buildGossipTopicSpecs(allocator, chain_config, null, false);
errdefer allocator.free(gossip_topics3);
const key_pair3 = enr_lib.KeyPair.generate();
const priv_key3 = key_pair3.v4.toString();
listen_addresses3 = try allocator.dupe(Multiaddr, &[_]Multiaddr{try Multiaddr.fromString(allocator, "/ip4/0.0.0.0/tcp/9003")});
Expand All @@ -500,6 +510,7 @@ fn mainInner() !void {
.listen_addresses = listen_addresses3,
.connect_peers = connect_peers3,
.node_registry = test_registry3,
.gossip_topics = gossip_topics3,
}, logger3_config.logger(.network));
backend3 = network3.getNetworkInterface();
logger1_config.logger(null).debug("--- ethlibp2p gossip {any}", .{backend1.gossip});
Expand Down
40 changes: 36 additions & 4 deletions pkgs/cli/src/node.zig
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub const NodeOptions = struct {
hash_sig_key_dir: []const u8,
node_registry: *node_lib.NodeNameRegistry,
checkpoint_sync_url: ?[]const u8 = null,
is_aggregator: bool = false,

pub fn deinit(self: *NodeOptions, allocator: std.mem.Allocator) void {
for (self.bootnodes) |b| allocator.free(b);
Expand Down Expand Up @@ -154,6 +155,12 @@ pub const Node = struct {
// transfer ownership of the chain_options to ChainConfig
const chain_config = try ChainConfig.init(Chain.custom, chain_options);

const validator_ids = try options.getValidatorIndices(allocator);
errdefer allocator.free(validator_ids);

const gossip_topics = try node_lib.buildGossipTopicSpecs(allocator, chain_config, validator_ids, options.is_aggregator);
errdefer allocator.free(gossip_topics);

// TODO we seem to be needing one loop because then the events added to loop are not being fired
// in the order to which they have been added even with the an appropriate delay added
// behavior of this further needs to be investigated but for now we will share the same loop
Expand All @@ -168,6 +175,7 @@ pub const Node = struct {
.connect_peers = addresses.connect_peers,
.local_private_key = options.local_priv_key,
.node_registry = options.node_registry,
.gossip_topics = gossip_topics,
}, options.logger_config.logger(.network));
errdefer self.network.deinit();
self.clock = try Clock.init(allocator, chain_config.genesis.genesis_time, &self.loop);
Expand Down Expand Up @@ -220,16 +228,12 @@ pub const Node = struct {

try self.loadValidatorKeypairs(num_validators);

const validator_ids = try options.getValidatorIndices(allocator);
errdefer allocator.free(validator_ids);

// Initialize metrics BEFORE beam_node so that metrics set during
// initialization (like lean_validators_count) are captured on real
// metrics instead of being discarded by noop metrics.
if (options.metrics_enable) {
try api.init(allocator);
}

try self.beam_node.init(allocator, .{
.nodeId = @intCast(options.node_key_index),
.config = chain_config,
Expand All @@ -241,6 +245,7 @@ pub const Node = struct {
.db = db,
.logger_config = options.logger_config,
.node_registry = options.node_registry,
.is_aggregator = options.is_aggregator,
});

// Start API server after chain is initialized so we can pass the chain pointer
Expand Down Expand Up @@ -511,6 +516,7 @@ pub fn buildStartOptions(
const local_priv_key = try getPrivateKeyFromValidatorConfig(allocator, opts.node_key, parsed_validator_config);

const node_key_index = try nodeKeyIndexFromYaml(opts.node_key, parsed_validator_config);
const is_aggregator = try getIsAggregatorFromValidatorConfig(opts.node_key, parsed_validator_config);

const hash_sig_key_dir = try std.mem.concat(allocator, u8, &[_][]const u8{
node_cmd.custom_genesis,
Expand All @@ -528,6 +534,7 @@ pub fn buildStartOptions(
opts.local_priv_key = local_priv_key;
opts.genesis_spec = genesis_spec;
opts.node_key_index = node_key_index;
opts.is_aggregator = is_aggregator;
opts.hash_sig_key_dir = hash_sig_key_dir;
opts.checkpoint_sync_url = node_cmd.@"checkpoint-sync-url";
}
Expand Down Expand Up @@ -873,6 +880,11 @@ fn getEnrFieldsFromValidatorConfig(allocator: std.mem.Allocator, node_key: []con
const value_str = try std.fmt.allocPrint(allocator, "0x{x:0>8}", .{@as(u32, @intCast(value.int))});
const key_copy = try allocator.dupe(u8, key);
try enr_fields.custom_fields.put(key_copy, value_str);
} else if (value == .boolean) {
const value_str = if (value.boolean) "0x01" else "0x00";
const key_copy = try allocator.dupe(u8, key);
const value_copy = try allocator.dupe(u8, value_str);
try enr_fields.custom_fields.put(key_copy, value_copy);
}
}

Expand All @@ -882,6 +894,26 @@ fn getEnrFieldsFromValidatorConfig(allocator: std.mem.Allocator, node_key: []con
return error.InvalidNodeKey;
}

fn getIsAggregatorFromValidatorConfig(node_key: []const u8, validator_config: Yaml) !bool {
for (validator_config.docs.items[0].map.get("validators").?.list) |entry| {
const name_value = entry.map.get("name").?;
if (name_value == .string and std.mem.eql(u8, name_value.string, node_key)) {
const enr_fields_value = entry.map.get("enrFields");
if (enr_fields_value == null) {
return false;
}

const fields_map = enr_fields_value.?.map;
const flag_value = fields_map.get("is_aggregator") orelse return false;
if (flag_value == .boolean) {
return flag_value.boolean;
}
return error.InvalidAggregatorFlag;
}
}
return error.InvalidNodeKey;
}

fn constructENRFromFields(allocator: std.mem.Allocator, private_key: []const u8, enr_fields: EnrFields) !ENR {
// Clean up private key (remove 0x prefix if present)
const secret_key_str = if (std.mem.startsWith(u8, private_key, "0x"))
Expand Down
68 changes: 60 additions & 8 deletions pkgs/network/src/ethlibp2p.zig
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,19 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const
};
break :attestationmessage .{ .attestation = message_data };
},
.aggregation => aggregationmessage: {
var message_data: types.SignedAggregatedAttestation = undefined;
ssz.deserialize(types.SignedAggregatedAttestation, uncompressed_message, &message_data, zigHandler.allocator) catch |e| {
zigHandler.logger.err("Error in deserializing the signed aggregated attestation message: {any}", .{e});
if (writeFailedBytes(uncompressed_message, "aggregation", zigHandler.allocator, null, zigHandler.logger)) |filename| {
zigHandler.logger.err("Aggregation deserialization failed - debug file created: {s}", .{filename});
} else {
zigHandler.logger.err("Aggregation deserialization failed - could not create debug file", .{});
}
return;
};
break :aggregationmessage .{ .aggregation = message_data };
},
};

const sender_peer_id_slice = std.mem.span(sender_peer_id);
Expand Down Expand Up @@ -351,6 +364,20 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const
},
);
},
.aggregation => |signed_aggregation| {
const slot = signed_aggregation.data.slot;
zigHandler.logger.debug(
"network-{d}:: received gossip aggregated attestation slot={d} (compressed={d}B, raw={d}B) from peer={s}{}",
.{
zigHandler.params.networkId,
slot,
message_bytes.len,
uncompressed_message.len,
sender_peer_id_slice,
node_name,
},
);
},
}

// Debug-only JSON dump (conversion happens only if debug is actually emitted).
Expand Down Expand Up @@ -885,6 +912,7 @@ pub const EthLibp2pParams = struct {
listen_addresses: []const Multiaddr,
connect_peers: ?[]const Multiaddr,
node_registry: *const NodeNameRegistry,
gossip_topics: []const interface.GossipTopicSpec,
};

pub const EthLibp2p = struct {
Expand Down Expand Up @@ -927,6 +955,7 @@ pub const EthLibp2p = struct {
.listen_addresses = params.listen_addresses,
.connect_peers = params.connect_peers,
.node_registry = params.node_registry,
.gossip_topics = params.gossip_topics,
},
.gossipHandler = gossip_handler,
.peerEventHandler = peer_event_handler,
Expand All @@ -950,6 +979,7 @@ pub const EthLibp2p = struct {
}

self.allocator.free(self.params.network_name);
self.allocator.free(self.params.gossip_topics);

var it = self.rpcCallbacks.iterator();
while (it.next()) |entry| {
Expand All @@ -974,10 +1004,28 @@ pub const EthLibp2p = struct {
topics_list.deinit(self.allocator);
}

for (std.enums.values(interface.GossipTopic)) |gossip_topic| {
var topic = try interface.LeanNetworkTopic.init(self.allocator, gossip_topic, .ssz_snappy, self.params.network_name);
for (self.params.gossip_topics) |topic_spec| {
var topic = try interface.LeanNetworkTopic.init(
self.allocator,
topic_spec.topic,
topic_spec.subnet_id,
.ssz_snappy,
self.params.network_name,
);
defer topic.deinit();
const topic_str = try topic.encode();

var is_duplicate = false;
for (topics_list.items) |existing| {
if (std.mem.eql(u8, existing, topic_str)) {
is_duplicate = true;
break;
}
}
if (is_duplicate) {
self.allocator.free(topic_str);
continue;
}
try topics_list.append(self.allocator, topic_str);
}
const topics_str = try std.mem.joinZ(self.allocator, ",", topics_list.items);
Expand All @@ -997,15 +1045,19 @@ pub const EthLibp2p = struct {
self.logger.info("network-{d}:: Network initialization complete, ready to send/receive messages", .{self.params.networkId});
}

pub fn publish(ptr: *anyopaque, data: *const interface.GossipMessage) anyerror!void {
pub fn publishWithTopic(ptr: *anyopaque, topic_spec: interface.GossipTopicSpec, data: *const interface.GossipMessage) anyerror!void {
const self: *Self = @ptrCast(@alignCast(ptr));
// publish
var topic = try data.getLeanNetworkTopic(self.allocator, self.params.network_name);
var topic = try interface.LeanNetworkTopic.init(
self.allocator,
topic_spec.topic,
topic_spec.subnet_id,
.ssz_snappy,
self.params.network_name,
);
defer topic.deinit();
const topic_str = try topic.encodeZ();
defer self.allocator.free(topic_str);

// TODO: deinit the message later ob once done
const message = try data.serialize(self.allocator);
defer self.allocator.free(message);

Expand All @@ -1015,7 +1067,7 @@ pub const EthLibp2p = struct {
publish_msg_to_rust_bridge(self.params.networkId, topic_str.ptr, compressed_message.ptr, compressed_message.len);
}

pub fn subscribe(ptr: *anyopaque, topics: []interface.GossipTopic, handler: interface.OnGossipCbHandler) anyerror!void {
pub fn subscribe(ptr: *anyopaque, topics: []const interface.GossipTopicSpec, handler: interface.OnGossipCbHandler) anyerror!void {
const self: *Self = @ptrCast(@alignCast(ptr));
return self.gossipHandler.subscribe(topics, handler);
}
Expand Down Expand Up @@ -1172,7 +1224,7 @@ pub const EthLibp2p = struct {
return .{
.gossip = .{
.ptr = self,
.publishFn = publish,
.publishWithTopicFn = publishWithTopic,
.subscribeFn = subscribe,
.onGossipFn = onGossip,
},
Expand Down
Loading