diff --git a/pkgs/cli/src/main.zig b/pkgs/cli/src/main.zig index be2fcf09..4187170a 100644 --- a/pkgs/cli/src/main.zig +++ b/pkgs/cli/src/main.zig @@ -437,6 +437,9 @@ fn mainInner() !void { backend3 = network.getNetworkInterface(); logger1_config.logger(null).debug("--- mock gossip {any}", .{backend1.gossip}); } else { + const gossip_topics1 = try node_lib.buildGossipTopicSpecs(allocator, chain_config, null, false); + errdefer allocator.free(gossip_topics1); + network1 = try allocator.create(networks.EthLibp2p); const key_pair1 = enr_lib.KeyPair.generate(); const priv_key1 = key_pair1.v4.toString(); @@ -455,10 +458,14 @@ fn mainInner() !void { .listen_addresses = listen_addresses1, .connect_peers = null, .node_registry = test_registry1, + .gossip_topics = gossip_topics1, }, logger1_config.logger(.network)); backend1 = network1.getNetworkInterface(); // init a new lib2p network here to connect with network1 + const gossip_topics2 = try node_lib.buildGossipTopicSpecs(allocator, chain_config, null, false); + errdefer allocator.free(gossip_topics2); + network2 = try allocator.create(networks.EthLibp2p); const key_pair2 = enr_lib.KeyPair.generate(); const priv_key2 = key_pair2.v4.toString(); @@ -478,11 +485,14 @@ fn mainInner() !void { .listen_addresses = listen_addresses2, .connect_peers = connect_peers, .node_registry = test_registry2, + .gossip_topics = gossip_topics2, }, logger2_config.logger(.network)); backend2 = network2.getNetworkInterface(); // init network3 for node 3 (delayed sync node) network3 = try allocator.create(networks.EthLibp2p); + const gossip_topics3 = try node_lib.buildGossipTopicSpecs(allocator, chain_config, null, false); + errdefer allocator.free(gossip_topics3); const key_pair3 = enr_lib.KeyPair.generate(); const priv_key3 = key_pair3.v4.toString(); listen_addresses3 = try allocator.dupe(Multiaddr, &[_]Multiaddr{try Multiaddr.fromString(allocator, "/ip4/0.0.0.0/tcp/9003")}); @@ -500,6 +510,7 @@ fn mainInner() !void { .listen_addresses = listen_addresses3, .connect_peers = connect_peers3, .node_registry = test_registry3, + .gossip_topics = gossip_topics3, }, logger3_config.logger(.network)); backend3 = network3.getNetworkInterface(); logger1_config.logger(null).debug("--- ethlibp2p gossip {any}", .{backend1.gossip}); diff --git a/pkgs/cli/src/node.zig b/pkgs/cli/src/node.zig index 9a81898e..c8cbcaf5 100644 --- a/pkgs/cli/src/node.zig +++ b/pkgs/cli/src/node.zig @@ -86,6 +86,7 @@ pub const NodeOptions = struct { hash_sig_key_dir: []const u8, node_registry: *node_lib.NodeNameRegistry, checkpoint_sync_url: ?[]const u8 = null, + is_aggregator: bool = false, pub fn deinit(self: *NodeOptions, allocator: std.mem.Allocator) void { for (self.bootnodes) |b| allocator.free(b); @@ -154,6 +155,12 @@ pub const Node = struct { // transfer ownership of the chain_options to ChainConfig const chain_config = try ChainConfig.init(Chain.custom, chain_options); + const validator_ids = try options.getValidatorIndices(allocator); + errdefer allocator.free(validator_ids); + + const gossip_topics = try node_lib.buildGossipTopicSpecs(allocator, chain_config, validator_ids, options.is_aggregator); + errdefer allocator.free(gossip_topics); + // TODO we seem to be needing one loop because then the events added to loop are not being fired // in the order to which they have been added even with the an appropriate delay added // behavior of this further needs to be investigated but for now we will share the same loop @@ -168,6 +175,7 @@ pub const Node = struct { .connect_peers = addresses.connect_peers, .local_private_key = options.local_priv_key, .node_registry = options.node_registry, + .gossip_topics = gossip_topics, }, options.logger_config.logger(.network)); errdefer self.network.deinit(); self.clock = try Clock.init(allocator, chain_config.genesis.genesis_time, &self.loop); @@ -220,16 +228,12 @@ pub const Node = struct { try self.loadValidatorKeypairs(num_validators); - const validator_ids = try options.getValidatorIndices(allocator); - errdefer allocator.free(validator_ids); - // Initialize metrics BEFORE beam_node so that metrics set during // initialization (like lean_validators_count) are captured on real // metrics instead of being discarded by noop metrics. if (options.metrics_enable) { try api.init(allocator); } - try self.beam_node.init(allocator, .{ .nodeId = @intCast(options.node_key_index), .config = chain_config, @@ -241,6 +245,7 @@ pub const Node = struct { .db = db, .logger_config = options.logger_config, .node_registry = options.node_registry, + .is_aggregator = options.is_aggregator, }); // Start API server after chain is initialized so we can pass the chain pointer @@ -511,6 +516,7 @@ pub fn buildStartOptions( const local_priv_key = try getPrivateKeyFromValidatorConfig(allocator, opts.node_key, parsed_validator_config); const node_key_index = try nodeKeyIndexFromYaml(opts.node_key, parsed_validator_config); + const is_aggregator = try getIsAggregatorFromValidatorConfig(opts.node_key, parsed_validator_config); const hash_sig_key_dir = try std.mem.concat(allocator, u8, &[_][]const u8{ node_cmd.custom_genesis, @@ -528,6 +534,7 @@ pub fn buildStartOptions( opts.local_priv_key = local_priv_key; opts.genesis_spec = genesis_spec; opts.node_key_index = node_key_index; + opts.is_aggregator = is_aggregator; opts.hash_sig_key_dir = hash_sig_key_dir; opts.checkpoint_sync_url = node_cmd.@"checkpoint-sync-url"; } @@ -873,6 +880,11 @@ fn getEnrFieldsFromValidatorConfig(allocator: std.mem.Allocator, node_key: []con const value_str = try std.fmt.allocPrint(allocator, "0x{x:0>8}", .{@as(u32, @intCast(value.int))}); const key_copy = try allocator.dupe(u8, key); try enr_fields.custom_fields.put(key_copy, value_str); + } else if (value == .boolean) { + const value_str = if (value.boolean) "0x01" else "0x00"; + const key_copy = try allocator.dupe(u8, key); + const value_copy = try allocator.dupe(u8, value_str); + try enr_fields.custom_fields.put(key_copy, value_copy); } } @@ -882,6 +894,26 @@ fn getEnrFieldsFromValidatorConfig(allocator: std.mem.Allocator, node_key: []con return error.InvalidNodeKey; } +fn getIsAggregatorFromValidatorConfig(node_key: []const u8, validator_config: Yaml) !bool { + for (validator_config.docs.items[0].map.get("validators").?.list) |entry| { + const name_value = entry.map.get("name").?; + if (name_value == .string and std.mem.eql(u8, name_value.string, node_key)) { + const enr_fields_value = entry.map.get("enrFields"); + if (enr_fields_value == null) { + return false; + } + + const fields_map = enr_fields_value.?.map; + const flag_value = fields_map.get("is_aggregator") orelse return false; + if (flag_value == .boolean) { + return flag_value.boolean; + } + return error.InvalidAggregatorFlag; + } + } + return error.InvalidNodeKey; +} + fn constructENRFromFields(allocator: std.mem.Allocator, private_key: []const u8, enr_fields: EnrFields) !ENR { // Clean up private key (remove 0x prefix if present) const secret_key_str = if (std.mem.startsWith(u8, private_key, "0x")) diff --git a/pkgs/network/src/ethlibp2p.zig b/pkgs/network/src/ethlibp2p.zig index 937e1e3b..943c1ae0 100644 --- a/pkgs/network/src/ethlibp2p.zig +++ b/pkgs/network/src/ethlibp2p.zig @@ -315,6 +315,19 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const }; break :attestationmessage .{ .attestation = message_data }; }, + .aggregation => aggregationmessage: { + var message_data: types.SignedAggregatedAttestation = undefined; + ssz.deserialize(types.SignedAggregatedAttestation, uncompressed_message, &message_data, zigHandler.allocator) catch |e| { + zigHandler.logger.err("Error in deserializing the signed aggregated attestation message: {any}", .{e}); + if (writeFailedBytes(uncompressed_message, "aggregation", zigHandler.allocator, null, zigHandler.logger)) |filename| { + zigHandler.logger.err("Aggregation deserialization failed - debug file created: {s}", .{filename}); + } else { + zigHandler.logger.err("Aggregation deserialization failed - could not create debug file", .{}); + } + return; + }; + break :aggregationmessage .{ .aggregation = message_data }; + }, }; const sender_peer_id_slice = std.mem.span(sender_peer_id); @@ -351,6 +364,20 @@ export fn handleMsgFromRustBridge(zigHandler: *EthLibp2p, topic_str: [*:0]const }, ); }, + .aggregation => |signed_aggregation| { + const slot = signed_aggregation.data.slot; + zigHandler.logger.debug( + "network-{d}:: received gossip aggregated attestation slot={d} (compressed={d}B, raw={d}B) from peer={s}{}", + .{ + zigHandler.params.networkId, + slot, + message_bytes.len, + uncompressed_message.len, + sender_peer_id_slice, + node_name, + }, + ); + }, } // Debug-only JSON dump (conversion happens only if debug is actually emitted). @@ -885,6 +912,7 @@ pub const EthLibp2pParams = struct { listen_addresses: []const Multiaddr, connect_peers: ?[]const Multiaddr, node_registry: *const NodeNameRegistry, + gossip_topics: []const interface.GossipTopicSpec, }; pub const EthLibp2p = struct { @@ -927,6 +955,7 @@ pub const EthLibp2p = struct { .listen_addresses = params.listen_addresses, .connect_peers = params.connect_peers, .node_registry = params.node_registry, + .gossip_topics = params.gossip_topics, }, .gossipHandler = gossip_handler, .peerEventHandler = peer_event_handler, @@ -950,6 +979,7 @@ pub const EthLibp2p = struct { } self.allocator.free(self.params.network_name); + self.allocator.free(self.params.gossip_topics); var it = self.rpcCallbacks.iterator(); while (it.next()) |entry| { @@ -974,10 +1004,28 @@ pub const EthLibp2p = struct { topics_list.deinit(self.allocator); } - for (std.enums.values(interface.GossipTopic)) |gossip_topic| { - var topic = try interface.LeanNetworkTopic.init(self.allocator, gossip_topic, .ssz_snappy, self.params.network_name); + for (self.params.gossip_topics) |topic_spec| { + var topic = try interface.LeanNetworkTopic.init( + self.allocator, + topic_spec.topic, + topic_spec.subnet_id, + .ssz_snappy, + self.params.network_name, + ); defer topic.deinit(); const topic_str = try topic.encode(); + + var is_duplicate = false; + for (topics_list.items) |existing| { + if (std.mem.eql(u8, existing, topic_str)) { + is_duplicate = true; + break; + } + } + if (is_duplicate) { + self.allocator.free(topic_str); + continue; + } try topics_list.append(self.allocator, topic_str); } const topics_str = try std.mem.joinZ(self.allocator, ",", topics_list.items); @@ -997,15 +1045,19 @@ pub const EthLibp2p = struct { self.logger.info("network-{d}:: Network initialization complete, ready to send/receive messages", .{self.params.networkId}); } - pub fn publish(ptr: *anyopaque, data: *const interface.GossipMessage) anyerror!void { + pub fn publishWithTopic(ptr: *anyopaque, topic_spec: interface.GossipTopicSpec, data: *const interface.GossipMessage) anyerror!void { const self: *Self = @ptrCast(@alignCast(ptr)); - // publish - var topic = try data.getLeanNetworkTopic(self.allocator, self.params.network_name); + var topic = try interface.LeanNetworkTopic.init( + self.allocator, + topic_spec.topic, + topic_spec.subnet_id, + .ssz_snappy, + self.params.network_name, + ); defer topic.deinit(); const topic_str = try topic.encodeZ(); defer self.allocator.free(topic_str); - // TODO: deinit the message later ob once done const message = try data.serialize(self.allocator); defer self.allocator.free(message); @@ -1015,7 +1067,7 @@ pub const EthLibp2p = struct { publish_msg_to_rust_bridge(self.params.networkId, topic_str.ptr, compressed_message.ptr, compressed_message.len); } - pub fn subscribe(ptr: *anyopaque, topics: []interface.GossipTopic, handler: interface.OnGossipCbHandler) anyerror!void { + pub fn subscribe(ptr: *anyopaque, topics: []const interface.GossipTopicSpec, handler: interface.OnGossipCbHandler) anyerror!void { const self: *Self = @ptrCast(@alignCast(ptr)); return self.gossipHandler.subscribe(topics, handler); } @@ -1172,7 +1224,7 @@ pub const EthLibp2p = struct { return .{ .gossip = .{ .ptr = self, - .publishFn = publish, + .publishWithTopicFn = publishWithTopic, .subscribeFn = subscribe, .onGossipFn = onGossip, }, diff --git a/pkgs/network/src/interface.zig b/pkgs/network/src/interface.zig index 5d301a39..73cba00a 100644 --- a/pkgs/network/src/interface.zig +++ b/pkgs/network/src/interface.zig @@ -57,11 +57,24 @@ fn freeJsonValue(val: *json.Value, allocator: Allocator) void { } } +pub const GossipTopicSpec = struct { + topic: GossipTopic, + subnet_id: ?usize = null, + + pub fn init(topic: GossipTopic) GossipTopicSpec { + return .{ .topic = topic }; + } + + pub fn attestationSubnet(subnet_id: usize) GossipTopicSpec { + return .{ .topic = .attestation, .subnet_id = subnet_id }; + } +}; + pub const GossipSub = struct { // ptr to the implementation ptr: *anyopaque, - publishFn: *const fn (ptr: *anyopaque, obj: *const GossipMessage) anyerror!void, - subscribeFn: *const fn (ptr: *anyopaque, topics: []GossipTopic, handler: OnGossipCbHandler) anyerror!void, + publishWithTopicFn: *const fn (ptr: *anyopaque, topic: GossipTopicSpec, obj: *const GossipMessage) anyerror!void, + subscribeFn: *const fn (ptr: *anyopaque, topics: []const GossipTopicSpec, handler: OnGossipCbHandler) anyerror!void, onGossipFn: *const fn (ptr: *anyopaque, data: *GossipMessage, sender_peer_id: []const u8) anyerror!void, pub fn format(self: GossipSub, comptime fmt: []const u8, options: std.fmt.FormatOptions, writer: anytype) !void { @@ -71,12 +84,12 @@ pub const GossipSub = struct { try writer.writeAll("GossipSub"); } - pub fn subscribe(self: GossipSub, topics: []GossipTopic, handler: OnGossipCbHandler) anyerror!void { + pub fn subscribe(self: GossipSub, topics: []const GossipTopicSpec, handler: OnGossipCbHandler) anyerror!void { return self.subscribeFn(self.ptr, topics, handler); } - pub fn publish(self: GossipSub, obj: *const GossipMessage) anyerror!void { - return self.publishFn(self.ptr, obj); + pub fn publishWithTopic(self: GossipSub, topic: GossipTopicSpec, obj: *const GossipMessage) anyerror!void { + return self.publishWithTopicFn(self.ptr, topic, obj); } }; @@ -144,24 +157,44 @@ pub const GossipEncoding = enum { pub const LeanNetworkTopic = struct { gossip_topic: GossipTopic, + subnet_id: ?usize, encoding: GossipEncoding, network: []const u8, allocator: Allocator, - pub fn init(allocator: Allocator, gossip_topic: GossipTopic, encoding: GossipEncoding, network: []const u8) !LeanNetworkTopic { + pub fn init(allocator: Allocator, gossip_topic: GossipTopic, subnet_id: ?usize, encoding: GossipEncoding, network: []const u8) !LeanNetworkTopic { return LeanNetworkTopic{ .allocator = allocator, .gossip_topic = gossip_topic, + .subnet_id = subnet_id, .encoding = encoding, .network = try allocator.dupe(u8, network), }; } pub fn encodeZ(self: *const LeanNetworkTopic) ![:0]u8 { + if (self.gossip_topic == .attestation) { + if (self.subnet_id) |subnet_id| { + return try std.fmt.allocPrintZ( + self.allocator, + "/{s}/{s}/attestation_{d}/{s}", + .{ topic_prefix, self.network, subnet_id, self.encoding.encode() }, + ); + } + } return try std.fmt.allocPrintZ(self.allocator, "/{s}/{s}/{s}/{s}", .{ topic_prefix, self.network, self.gossip_topic.encode(), self.encoding.encode() }); } pub fn encode(self: *const LeanNetworkTopic) ![]u8 { + if (self.gossip_topic == .attestation) { + if (self.subnet_id) |subnet_id| { + return try std.fmt.allocPrint( + self.allocator, + "/{s}/{s}/attestation_{d}/{s}", + .{ topic_prefix, self.network, subnet_id, self.encoding.encode() }, + ); + } + } return try std.fmt.allocPrint(self.allocator, "/{s}/{s}/{s}/{s}", .{ topic_prefix, self.network, self.gossip_topic.encode(), self.encoding.encode() }); } @@ -178,12 +211,22 @@ pub const LeanNetworkTopic = struct { const gossip_topic_slice = iter.next() orelse return error.InvalidTopic; const encoding_slice = iter.next() orelse return error.InvalidTopic; - const gossip_topic = try GossipTopic.decode(gossip_topic_slice); + var subnet_id: ?usize = null; + var gossip_topic: GossipTopic = undefined; + if (std.mem.startsWith(u8, gossip_topic_slice, "attestation_")) { + const subnet_slice = gossip_topic_slice["attestation_".len..]; + if (subnet_slice.len == 0) return error.InvalidTopic; + subnet_id = std.fmt.parseInt(usize, subnet_slice, 10) catch return error.InvalidTopic; + gossip_topic = .attestation; + } else { + gossip_topic = try GossipTopic.decode(gossip_topic_slice); + } const encoding = try GossipEncoding.decode(encoding_slice); return LeanNetworkTopic{ .allocator = allocator, .gossip_topic = gossip_topic, + .subnet_id = subnet_id, .encoding = encoding, .network = try allocator.dupe(u8, network_slice), }; @@ -197,6 +240,7 @@ pub const LeanNetworkTopic = struct { pub const GossipTopic = enum { block, attestation, + aggregation, pub fn encode(self: GossipTopic) []const u8 { return std.enums.tagName(GossipTopic, self).?; @@ -210,12 +254,13 @@ pub const GossipTopic = enum { pub const GossipMessage = union(GossipTopic) { block: types.SignedBlockWithAttestation, attestation: types.SignedAttestation, + aggregation: types.SignedAggregatedAttestation, const Self = @This(); pub fn getLeanNetworkTopic(self: *const Self, allocator: Allocator, network_name: []const u8) !LeanNetworkTopic { const gossip_topic = std.meta.activeTag(self.*); - return try LeanNetworkTopic.init(allocator, gossip_topic, .ssz_snappy, network_name); + return try LeanNetworkTopic.init(allocator, gossip_topic, null, .ssz_snappy, network_name); } pub fn getGossipTopic(self: *const Self) GossipTopic { @@ -234,6 +279,9 @@ pub const GossipMessage = union(GossipTopic) { att.validator_id, att.message.slot, }), + .aggregation => |agg| try writer.print("GossipMessage{{ aggregation: slot={d} }}", .{ + agg.data.slot, + }), } } @@ -263,6 +311,10 @@ pub const GossipMessage = union(GossipTopic) { cloned_data.* = .{ .attestation = undefined }; try types.sszClone(allocator, types.SignedAttestation, self.attestation, &cloned_data.attestation); }, + .aggregation => { + cloned_data.* = .{ .aggregation = undefined }; + try types.sszClone(allocator, types.SignedAggregatedAttestation, self.aggregation, &cloned_data.aggregation); + }, } return cloned_data; @@ -278,6 +330,10 @@ pub const GossipMessage = union(GossipTopic) { std.log.err("Failed to convert attestation to JSON: {any}", .{e}); return e; }, + .aggregation => |aggregation| aggregation.toJson(allocator) catch |e| { + std.log.err("Failed to convert aggregated attestation to JSON: {any}", .{e}); + return e; + }, }; } @@ -876,12 +932,15 @@ pub const GenericGossipHandler = struct { // we don't need to run the loop as this is a shared loop and is already being run by the clock } - pub fn subscribe(self: *Self, topics: []GossipTopic, handler: OnGossipCbHandler) anyerror!void { - for (topics) |topic| { + pub fn subscribe(self: *Self, topics: []const GossipTopicSpec, handler: OnGossipCbHandler) anyerror!void { + var seen = std.EnumSet(GossipTopic).initEmpty(); + for (topics) |spec| { + if (seen.contains(spec.topic)) continue; + seen.insert(spec.topic); // handlerarr should already be there - var handlerArr = self.onGossipHandlers.get(topic).?; + var handlerArr = self.onGossipHandlers.get(spec.topic).?; try handlerArr.append(self.allocator, handler); - try self.onGossipHandlers.put(self.allocator, topic, handlerArr); + try self.onGossipHandlers.put(self.allocator, spec.topic, handlerArr); } } }; @@ -903,13 +962,17 @@ test GossipTopic { try std.testing.expect(std.mem.eql(u8, gossip_topic2.encode(), "attestation")); try std.testing.expectEqual(gossip_topic2, try GossipTopic.decode("attestation")); + const gossip_topic3 = GossipTopic.aggregation; + try std.testing.expect(std.mem.eql(u8, gossip_topic3.encode(), "aggregation")); + try std.testing.expectEqual(gossip_topic3, try GossipTopic.decode("aggregation")); + try std.testing.expectError(error.InvalidDecoding, GossipTopic.decode("invalid")); } test LeanNetworkTopic { const allocator = std.testing.allocator; - var topic = try LeanNetworkTopic.init(allocator, .block, .ssz_snappy, "devnet0"); + var topic = try LeanNetworkTopic.init(allocator, .block, null, .ssz_snappy, "devnet0"); defer topic.deinit(); const topic_str = try topic.encodeZ(); @@ -923,4 +986,17 @@ test LeanNetworkTopic { try std.testing.expectEqual(topic.gossip_topic, decoded_topic.gossip_topic); try std.testing.expectEqual(topic.encoding, decoded_topic.encoding); try std.testing.expect(std.mem.eql(u8, topic.network, decoded_topic.network)); + + var subnet_topic = try LeanNetworkTopic.init(allocator, .attestation, 2, .ssz_snappy, "devnet0"); + defer subnet_topic.deinit(); + const subnet_topic_str = try subnet_topic.encodeZ(); + defer allocator.free(subnet_topic_str); + + try std.testing.expect(std.mem.eql(u8, subnet_topic_str, "/leanconsensus/devnet0/attestation_2/ssz_snappy")); + + var decoded_subnet_topic = try LeanNetworkTopic.decode(allocator, subnet_topic_str.ptr); + defer decoded_subnet_topic.deinit(); + + try std.testing.expectEqual(subnet_topic.gossip_topic, decoded_subnet_topic.gossip_topic); + try std.testing.expectEqual(subnet_topic.subnet_id, decoded_subnet_topic.subnet_id); } diff --git a/pkgs/network/src/lib.zig b/pkgs/network/src/lib.zig index ec02281f..a173e497 100644 --- a/pkgs/network/src/lib.zig +++ b/pkgs/network/src/lib.zig @@ -7,6 +7,7 @@ pub const OnGossipCbHandler = interfaceFactory.OnGossipCbHandler; pub const GossipEncoding = interfaceFactory.GossipEncoding; pub const LeanNetworkTopic = interfaceFactory.LeanNetworkTopic; pub const GossipTopic = interfaceFactory.GossipTopic; +pub const GossipTopicSpec = interfaceFactory.GossipTopicSpec; pub const GossipMessage = interfaceFactory.GossipMessage; pub const LeanSupportedProtocol = interfaceFactory.LeanSupportedProtocol; pub const ReqRespRequest = interfaceFactory.ReqRespRequest; diff --git a/pkgs/network/src/mock.zig b/pkgs/network/src/mock.zig index 8771ca11..8cb52498 100644 --- a/pkgs/network/src/mock.zig +++ b/pkgs/network/src/mock.zig @@ -608,7 +608,8 @@ pub const Mock = struct { self.finalizeServerStream(ctx); } - pub fn publish(ptr: *anyopaque, data: *const interface.GossipMessage) anyerror!void { + pub fn publishWithTopic(ptr: *anyopaque, topic: interface.GossipTopicSpec, data: *const interface.GossipMessage) anyerror!void { + _ = topic; // TODO: prevent from publishing to self handler const self: *Self = @ptrCast(@alignCast(ptr)); // Try to find a valid peer_id from connected peers, otherwise use a default @@ -625,7 +626,7 @@ pub const Mock = struct { return self.gossipHandler.onGossip(data, sender_peer_id, true); } - pub fn subscribe(ptr: *anyopaque, topics: []interface.GossipTopic, handler: interface.OnGossipCbHandler) anyerror!void { + pub fn subscribe(ptr: *anyopaque, topics: []const interface.GossipTopicSpec, handler: interface.OnGossipCbHandler) anyerror!void { const self: *Self = @ptrCast(@alignCast(ptr)); return self.gossipHandler.subscribe(topics, handler); } @@ -733,7 +734,7 @@ pub const Mock = struct { return .{ .gossip = .{ .ptr = self, - .publishFn = publish, + .publishWithTopicFn = publishWithTopic, .subscribeFn = subscribe, .onGossipFn = onGossip, }, @@ -786,7 +787,7 @@ test "Mock messaging across two subscribers" { var subscriber2 = TestSubscriber{}; // Both subscribers subscribe to the same block topic using the complete network interface - var topics = [_]interface.GossipTopic{.block}; + var topics = [_]interface.GossipTopicSpec{.{ .topic = .block }}; const network = mock.getNetworkInterface(); try network.gossip.subscribe(&topics, subscriber1.getCallbackHandler()); try network.gossip.subscribe(&topics, subscriber2.getCallbackHandler()); @@ -830,7 +831,7 @@ test "Mock messaging across two subscribers" { } }; // Publish the message using the network interface - both subscribers should receive it - try network.gossip.publish(block_message); + try network.gossip.publishWithTopic(.{ .topic = .block }, block_message); // Run the event loop to process scheduled callbacks try loop.run(.until_done); diff --git a/pkgs/node/src/chain.zig b/pkgs/node/src/chain.zig index 23260c3c..12103e7a 100644 --- a/pkgs/node/src/chain.zig +++ b/pkgs/node/src/chain.zig @@ -80,6 +80,42 @@ pub const ProducedBlock = struct { } }; +fn checkpointEquals(a: types.Checkpoint, b: types.Checkpoint) bool { + return a.slot == b.slot and std.mem.eql(u8, &a.root, &b.root); +} + +fn buildAggregatedAttestationsFromAttestations( + allocator: Allocator, + attestations: []const types.Attestation, +) !types.AggregatedAttestations { + var aggregated = try types.AggregatedAttestations.init(allocator); + errdefer { + for (aggregated.slice()) |*att| { + att.deinit(); + } + aggregated.deinit(); + } + + var root_indices = std.AutoHashMap(types.Root, usize).init(allocator); + defer root_indices.deinit(); + + for (attestations) |att| { + const data_root = try att.data.sszRoot(allocator); + if (root_indices.get(data_root)) |idx| { + var slice = aggregated.slice(); + try types.aggregationBitsSet(&slice[idx].aggregation_bits, @intCast(att.validator_id), true); + } else { + var bits = try types.AggregationBits.init(allocator); + errdefer bits.deinit(); + try types.aggregationBitsSet(&bits, @intCast(att.validator_id), true); + try aggregated.append(.{ .aggregation_bits = bits, .data = att.data }); + try root_indices.put(data_root, aggregated.len() - 1); + } + } + + return aggregated; +} + pub const BeamChain = struct { config: configs.ChainConfig, anchor_state: *types.BeamState, @@ -96,6 +132,7 @@ pub const BeamChain = struct { stf_logger: zeam_utils.ModuleLogger, block_building_logger: zeam_utils.ModuleLogger, registered_validator_ids: []usize = &[_]usize{}, + is_aggregator: bool = false, db: database.Db, // Track last-emitted checkpoints to avoid duplicate SSE events (e.g., genesis spam) last_emitted_justified: types.Checkpoint, @@ -179,6 +216,20 @@ pub const BeamChain = struct { // tacking registrations and keeping it alive for 3*2=6 slots self.registered_validator_ids = validator_ids; zeam_metrics.metrics.lean_validators_count.set(self.registered_validator_ids.len); + self.updateAggregatorInfo() catch |err| { + self.module_logger.warn("failed to update aggregator info: {any}", .{err}); + }; + } + + fn updateAggregatorInfo(self: *Self) !void { + try self.forkChoice.setAggregatorInfo(self.is_aggregator, self.registered_validator_ids); + } + + pub fn setAggregator(self: *Self, is_aggregator: bool) void { + self.is_aggregator = is_aggregator; + self.updateAggregatorInfo() catch |err| { + self.module_logger.warn("failed to update aggregator info: {any}", .{err}); + }; } pub fn onInterval(self: *Self, time_intervals: usize) !void { @@ -288,48 +339,42 @@ pub const BeamChain = struct { // one must make the forkchoice tick to the right time if there is a race condition // however in that scenario forkchoice also needs to be protected by mutex/kept thread safe const chainHead = try self.forkChoice.updateHead(); - const attestations = try self.forkChoice.getProposalAttestations(); - defer self.allocator.free(attestations); - const parent_root = chainHead.blockRoot; - const pre_state = self.states.get(parent_root) orelse return BlockProductionError.MissingPreState; - var post_state_opt: ?*types.BeamState = try self.allocator.create(types.BeamState); + + const available_attestations = try self.forkChoice.getLatestNewAttestations(); + defer self.allocator.free(available_attestations); + + var selected_attestations = try self.selectAttestationsForBlock( + opts, + parent_root, + pre_state, + available_attestations, + ); + defer selected_attestations.deinit(); + var post_state_opt = try self.clonePostState(pre_state); errdefer if (post_state_opt) |post_state_ptr| { post_state_ptr.deinit(); self.allocator.destroy(post_state_ptr); }; const post_state = post_state_opt.?; - try types.sszClone(self.allocator, types.BeamState, pre_state.*, post_state); - // Use the two-phase aggregation algorithm: - // Phase 1: Collect individual signatures from gossip_signatures - // Phase 2: Fallback to aggregated_payloads using greedy set-cover - var aggregation = try types.AggregatedAttestationsResult.init(self.allocator); - var agg_att_cleanup = true; - var agg_sig_cleanup = true; - errdefer if (agg_att_cleanup) { - for (aggregation.attestations.slice()) |*att| { + // Select aggregated proofs from stored payloads (no local aggregation fallback). + var aggregation = try self.selectAggregatedProofsForBlock(selected_attestations.items); + var agg_att_opt: ?*types.AggregatedAttestations = &aggregation.attestations; + var agg_sig_opt: ?*types.AttestationSignatures = &aggregation.attestation_signatures; + errdefer if (agg_att_opt) |list| { + for (list.slice()) |*att| { att.deinit(); } - aggregation.attestations.deinit(); + list.deinit(); }; - errdefer if (agg_sig_cleanup) { - for (aggregation.attestation_signatures.slice()) |*sig| { + errdefer if (agg_sig_opt) |list| { + for (list.slice()) |*sig| { sig.deinit(); } - aggregation.attestation_signatures.deinit(); + list.deinit(); }; - // Lock mutex to protect concurrent access to gossip_signatures and aggregated_payloads - self.forkChoice.signatures_mutex.lock(); - defer self.forkChoice.signatures_mutex.unlock(); - try aggregation.computeAggregatedSignatures( - attestations, - &pre_state.validators, - &self.forkChoice.gossip_signatures, - &self.forkChoice.aggregated_payloads, - ); - // keeping for later when execution will be integrated into lean // const timestamp = self.config.genesis.genesis_time + opts.slot * params.SECONDS_PER_SLOT; @@ -343,11 +388,11 @@ pub const BeamChain = struct { .attestations = aggregation.attestations, }, }; - agg_att_cleanup = false; // Ownership moved to block.body.attestations + agg_att_opt = null; // Ownership moved to block.body.attestations errdefer block.deinit(); var attestation_signatures = aggregation.attestation_signatures; - agg_sig_cleanup = false; // Ownership moved to attestation_signatures + agg_sig_opt = null; // Ownership moved to attestation_signatures errdefer { for (attestation_signatures.slice()) |*sig_group| { sig_group.deinit(); @@ -355,52 +400,174 @@ pub const BeamChain = struct { attestation_signatures.deinit(); } + const block_root = try self.applyBlockAndCacheState(opts, &block, post_state, &post_state_opt); + + var forkchoice_added = false; + errdefer if (!forkchoice_added) { + if (self.states.fetchRemove(block_root)) |entry| { + entry.value.deinit(); + self.allocator.destroy(entry.value); + } + }; + + // 4. Add the block to directly forkchoice as this proposer will next need to construct its vote + // note - attestations packed in the block are already in the knownVotes so we don't need to re-import + // them in the forkchoice + try self.addBlockToForkchoice(&block, post_state, block_root); + forkchoice_added = true; + + return .{ + .block = block, + .blockRoot = block_root, + .attestation_signatures = attestation_signatures, + }; + } + + fn clonePostState( + self: *Self, + pre_state: *const types.BeamState, + ) !?*types.BeamState { + const post_state_opt: ?*types.BeamState = try self.allocator.create(types.BeamState); + const post_state = post_state_opt.?; + try types.sszClone(self.allocator, types.BeamState, pre_state.*, post_state); + return post_state_opt; + } + + fn applyBlockAndCacheState( + self: *Self, + opts: BlockProductionParams, + block: *types.BeamBlock, + post_state: *types.BeamState, + post_state_opt: *?*types.BeamState, + ) !types.Root { const block_str = try block.toJsonString(self.allocator); defer self.allocator.free(block_str); - self.module_logger.debug("node-{d}::going for block production opts={any} raw block={s}", .{ self.nodeId, opts, block_str }); // 2. apply STF to get post state & update post state root & cache it - try stf.apply_raw_block(self.allocator, post_state, &block, self.block_building_logger); + try stf.apply_raw_block(self.allocator, post_state, block, self.block_building_logger); const block_str_2 = try block.toJsonString(self.allocator); defer self.allocator.free(block_str_2); - self.module_logger.debug("applied raw block opts={any} raw block={s}", .{ opts, block_str_2 }); // 3. cache state to save recompute while adding the block on publish var block_root: [32]u8 = undefined; - try zeam_utils.hashTreeRoot(types.BeamBlock, block, &block_root, self.allocator); - + try zeam_utils.hashTreeRoot(types.BeamBlock, block.*, &block_root, self.allocator); try self.states.put(block_root, post_state); - post_state_opt = null; + post_state_opt.* = null; - var forkchoice_added = false; - errdefer if (!forkchoice_added) { - if (self.states.fetchRemove(block_root)) |entry| { - entry.value.deinit(); - self.allocator.destroy(entry.value); - } - }; + return block_root; + } + fn addBlockToForkchoice( + self: *Self, + block: *types.BeamBlock, + post_state: *types.BeamState, + block_root: types.Root, + ) !void { // 4. Add the block to directly forkchoice as this proposer will next need to construct its vote // note - attestations packed in the block are already in the knownVotes so we don't need to re-import // them in the forkchoice - _ = try self.forkChoice.onBlock(block, post_state, .{ + _ = try self.forkChoice.onBlock(block.*, post_state, .{ .currentSlot = block.slot, .blockDelayMs = 0, .blockRoot = block_root, // confirmed in publish .confirmed = false, }); - forkchoice_added = true; _ = try self.forkChoice.updateHead(); + } - return .{ - .block = block, - .blockRoot = block_root, - .attestation_signatures = attestation_signatures, - }; + fn selectAttestationsForBlock( + self: *Self, + opts: BlockProductionParams, + parent_root: types.Root, + pre_state: *const types.BeamState, + available_attestations: []const types.Attestation, + ) !std.ArrayList(types.Attestation) { + var selected_attestations = std.ArrayList(types.Attestation).init(self.allocator); + errdefer selected_attestations.deinit(); + + if (available_attestations.len == 0) { + return selected_attestations; + } + + var selected_keys = std.AutoHashMap(types.SignatureKey, void).init(self.allocator); + defer selected_keys.deinit(); + + while (true) { + const candidate_attestations = + try buildAggregatedAttestationsFromAttestations(self.allocator, selected_attestations.items); + var candidate_block = types.BeamBlock{ + .slot = opts.slot, + .proposer_index = opts.proposer_index, + .parent_root = parent_root, + .state_root = undefined, + .body = types.BeamBlockBody{ + .attestations = candidate_attestations, + }, + }; + defer candidate_block.deinit(); + + var temp_state_ptr_opt: ?*types.BeamState = try self.allocator.create(types.BeamState); + errdefer if (temp_state_ptr_opt) |ptr| self.allocator.destroy(ptr); + + const temp_state_ptr = temp_state_ptr_opt.?; + try types.sszClone(self.allocator, types.BeamState, pre_state.*, temp_state_ptr); + temp_state_ptr_opt = null; + defer { + temp_state_ptr.deinit(); + self.allocator.destroy(temp_state_ptr); + } + try stf.apply_raw_block(self.allocator, temp_state_ptr, &candidate_block, self.block_building_logger); + + var added_any = false; + var no_payloads = false; + self.forkChoice.signatures_mutex.lock(); + defer self.forkChoice.signatures_mutex.unlock(); + if (self.forkChoice.aggregated_payloads.count() == 0) { + no_payloads = true; + } else { + for (available_attestations) |attestation| { + const data = attestation.data; + if (!self.forkChoice.protoArray.indices.contains(data.head.root)) continue; + if (!checkpointEquals(data.source, temp_state_ptr.latest_justified)) continue; + + const data_root = try data.sszRoot(self.allocator); + const sig_key = types.SignatureKey{ + .validator_id = attestation.validator_id, + .data_root = data_root, + }; + + if (selected_keys.contains(sig_key)) continue; + if (self.forkChoice.aggregated_payloads.get(sig_key) == null) continue; + + try selected_attestations.append(attestation); + try selected_keys.put(sig_key, {}); + added_any = true; + } + } + + if (no_payloads or !added_any) break; + } + + return selected_attestations; + } + + fn selectAggregatedProofsForBlock( + self: *Self, + selected_attestations: []const types.Attestation, + ) !types.AggregatedAttestationsResult { + var aggregation = try types.AggregatedAttestationsResult.init(self.allocator); + errdefer aggregation.deinit(); + self.forkChoice.signatures_mutex.lock(); + defer self.forkChoice.signatures_mutex.unlock(); + try aggregation.selectAggregatedProofs( + selected_attestations, + &self.forkChoice.aggregated_payloads, + ); + return aggregation; } pub fn constructAttestationData(self: *Self, opts: AttestationConstructionParams) !types.AttestationData { @@ -597,6 +764,22 @@ pub const BeamChain = struct { zeam_metrics.metrics.lean_attestations_valid_total.incr(.{ .source = "gossip" }) catch {}; return .{}; }, + .aggregation => |signed_aggregation| { + const slot = signed_aggregation.data.slot; + const sender_node_name = self.node_registry.getNodeNameFromPeerId(sender_peer_id); + self.module_logger.debug("chain received gossip aggregated attestation for slot={d} from peer={s}{}", .{ + slot, + sender_peer_id, + sender_node_name, + }); + + self.onGossipAggregatedAttestation(signed_aggregation) catch |err| { + self.module_logger.err("aggregated attestation processing error: {any}", .{err}); + return err; + }; + self.module_logger.info("processed gossip aggregated attestation for slot={d}", .{slot}); + return .{}; + }, } } @@ -865,7 +1048,7 @@ pub const BeamChain = struct { continue; }; - self.forkChoice.storeAggregatedPayload(validator_id, &aggregated_attestation.data, cloned_proof) catch |e| { + self.forkChoice.storeAggregatedPayload(validator_id, block.slot, &aggregated_attestation.data, cloned_proof) catch |e| { self.module_logger.warn("failed to store aggregated payload for validator={d}: {any}", .{ validator_index, e }); cloned_proof.deinit(); }; @@ -1172,6 +1355,205 @@ pub const BeamChain = struct { return self.forkChoice.onGossipAttestation(signedAttestation, false); } + pub fn aggregateCommitteeSignatures(self: *Self) !std.ArrayList(types.SignedAggregatedAttestation) { + var gossip_messages = std.ArrayList(types.SignedAggregatedAttestation).init(self.allocator); + errdefer { + for (gossip_messages.items) |*msg| { + msg.deinit(); + } + gossip_messages.deinit(); + } + + const attestations = try self.forkChoice.getLatestNewAttestations(); + defer self.allocator.free(attestations); + + if (attestations.len == 0) return gossip_messages; + + var filtered_attestations = try self.filterCommitteeAttestations(attestations); + defer filtered_attestations.deinit(); + + if (filtered_attestations.items.len == 0) return gossip_messages; + + const head_state = self.states.get(self.forkChoice.head.blockRoot) orelse return AttestationValidationError.MissingState; + const current_slot = self.forkChoice.fcStore.timeSlots; + + var aggregation = try types.AggregatedAttestationsResult.init(self.allocator); + defer aggregation.deinit(); + + var gossip_snapshot = try self.snapshotGossipSignatures(filtered_attestations.items); + defer gossip_snapshot.deinit(); + try aggregation.aggregateGossipSignatures( + filtered_attestations.items, + &head_state.validators, + &gossip_snapshot, + ); + + const aggregated_attestations = aggregation.attestations.constSlice(); + const signature_groups = aggregation.attestation_signatures.constSlice(); + try self.appendAggregatedCommitteeMessages( + aggregated_attestations, + signature_groups, + current_slot, + &gossip_messages, + ); + + return gossip_messages; + } + + fn filterCommitteeAttestations( + self: *Self, + attestations: []const types.Attestation, + ) !std.ArrayList(types.Attestation) { + var filtered_attestations = std.ArrayList(types.Attestation).init(self.allocator); + errdefer filtered_attestations.deinit(); + try filtered_attestations.ensureTotalCapacity(attestations.len); + + for (attestations) |attestation| { + if (self.forkChoice.shouldStoreCommitteeSignature(attestation.validator_id)) { + try filtered_attestations.append(attestation); + } + } + + return filtered_attestations; + } + + fn snapshotGossipSignatures( + self: *Self, + attestations: []const types.Attestation, + ) !types.SignaturesMap { + // Build the unique key set outside the lock; we only need a stable snapshot of these entries. + var signature_keys = std.AutoHashMap(types.SignatureKey, void).init(self.allocator); + defer signature_keys.deinit(); + + for (attestations) |attestation| { + const data_root = try attestation.data.sszRoot(self.allocator); + const sig_key = types.SignatureKey{ + .validator_id = attestation.validator_id, + .data_root = data_root, + }; + if (!signature_keys.contains(sig_key)) { + try signature_keys.put(sig_key, {}); + } + } + + // Snapshot only the needed signatures under lock, then aggregate outside the lock. + // This keeps the critical section short and avoids blocking writers during XMSS aggregation. + var gossip_snapshot = types.SignaturesMap.init(self.allocator); + errdefer gossip_snapshot.deinit(); + + var signatures_lock: ?*std.Thread.Mutex = &self.forkChoice.signatures_mutex; + signatures_lock.?.lock(); + errdefer if (signatures_lock) |lock| lock.unlock(); + var key_it = signature_keys.keyIterator(); + while (key_it.next()) |sig_key| { + if (self.forkChoice.gossip_signatures.get(sig_key.*)) |sig_entry| { + try gossip_snapshot.put(sig_key.*, sig_entry); + } + } + signatures_lock.?.unlock(); + signatures_lock = null; + + return gossip_snapshot; + } + + fn appendAggregatedCommitteeMessages( + self: *Self, + aggregated_attestations: []const types.AggregatedAttestation, + signature_groups: []const types.AggregatedSignatureProof, + current_slot: types.Slot, + gossip_messages: *std.ArrayList(types.SignedAggregatedAttestation), + ) !void { + if (aggregated_attestations.len != signature_groups.len) { + return error.AggregationMismatch; + } + + for (aggregated_attestations, 0..) |aggregated_attestation, index| { + const proof = signature_groups[index]; + + var proof_clone: types.AggregatedSignatureProof = undefined; + try types.sszClone(self.allocator, types.AggregatedSignatureProof, proof, &proof_clone); + var proof_opt: ?*types.AggregatedSignatureProof = &proof_clone; + errdefer if (proof_opt) |p| p.deinit(); + + try gossip_messages.append(.{ + .data = aggregated_attestation.data, + .proof = proof_clone, + }); + proof_opt = null; + + var validator_indices = try types.aggregationBitsToValidatorIndices(&aggregated_attestation.aggregation_bits, self.allocator); + defer validator_indices.deinit(); + + for (validator_indices.items) |validator_index| { + var stored_proof: types.AggregatedSignatureProof = undefined; + try types.sszClone(self.allocator, types.AggregatedSignatureProof, proof, &stored_proof); + self.forkChoice.storeAggregatedPayload(@intCast(validator_index), current_slot, &aggregated_attestation.data, stored_proof) catch |e| { + stored_proof.deinit(); + self.module_logger.warn("failed to store aggregated payload from committee aggregation: {any}", .{e}); + }; + } + } + } + + pub fn onGossipAggregatedAttestation(self: *Self, signed_attestation: types.SignedAggregatedAttestation) !void { + const data = signed_attestation.data; + const proof = &signed_attestation.proof; + const current_slot = self.forkChoice.fcStore.timeSlots; + + var validator_indices = try types.aggregationBitsToValidatorIndices(&proof.participants, self.allocator); + defer validator_indices.deinit(); + + if (validator_indices.items.len == 0) { + return error.EmptyAggregation; + } + + const representative_attestation = types.Attestation{ + .validator_id = @intCast(validator_indices.items[0]), + .data = data, + }; + try self.validateAttestation(representative_attestation, false); + + const key_state = self.states.get(data.target.root) orelse return AttestationValidationError.MissingState; + + var pubkeys = std.ArrayList(xmss.PublicKey).init(self.allocator); + defer { + for (pubkeys.items) |*pk| pk.deinit(); + pubkeys.deinit(); + } + + var pk_handles = try self.allocator.alloc(*const xmss.HashSigPublicKey, validator_indices.items.len); + defer self.allocator.free(pk_handles); + + for (validator_indices.items, 0..) |validator_id, i| { + const val = try key_state.validators.get(@intCast(validator_id)); + const pk = try xmss.PublicKey.fromBytes(&val.pubkey); + try pubkeys.append(pk); + pk_handles[i] = pk.handle; + } + + var message_hash: [32]u8 = undefined; + try zeam_utils.hashTreeRoot(types.AttestationData, data, &message_hash, self.allocator); + + try proof.verify(pk_handles, &message_hash, data.slot); + + for (validator_indices.items) |validator_id| { + var stored_proof: types.AggregatedSignatureProof = undefined; + try types.sszClone(self.allocator, types.AggregatedSignatureProof, proof.*, &stored_proof); + self.forkChoice.storeAggregatedPayload(@intCast(validator_id), current_slot, &data, stored_proof) catch |e| { + stored_proof.deinit(); + self.module_logger.warn("failed to store aggregated payload from aggregation gossip: {any}", .{e}); + }; + + const attestation = types.Attestation{ + .validator_id = @intCast(validator_id), + .data = data, + }; + self.forkChoice.onAttestation(attestation, false) catch |e| { + self.module_logger.err("error processing aggregated gossip attestation={any} e={any}", .{ attestation, e }); + }; + } + } + pub fn getStatus(self: *Self) types.Status { const finalized = self.forkChoice.fcStore.latest_finalized; const head = self.forkChoice.head; diff --git a/pkgs/node/src/constants.zig b/pkgs/node/src/constants.zig index cd9c60f4..7338ad56 100644 --- a/pkgs/node/src/constants.zig +++ b/pkgs/node/src/constants.zig @@ -2,7 +2,7 @@ const std = @import("std"); const params = @import("@zeam/params"); // a constant fixed only relevant to node operation and hence not in the config or preset -pub const INTERVALS_PER_SLOT = 4; +pub const INTERVALS_PER_SLOT = 5; pub const SECONDS_PER_INTERVAL_MS: isize = @divFloor(params.SECONDS_PER_SLOT * std.time.ms_per_s, INTERVALS_PER_SLOT); // Maximum number of slots in the future that an attestation is allowed to reference diff --git a/pkgs/node/src/forkchoice.zig b/pkgs/node/src/forkchoice.zig index 1a0fac0a..67436d36 100644 --- a/pkgs/node/src/forkchoice.zig +++ b/pkgs/node/src/forkchoice.zig @@ -12,6 +12,9 @@ const params = @import("@zeam/params"); const constants = @import("./constants.zig"); +const ATTESTATION_COMMITTEE_COUNT: usize = @intCast(params.ATTESTATION_COMMITTEE_COUNT); +const LocalSubnetSet = std.StaticBitSet(ATTESTATION_COMMITTEE_COUNT); + const AggregatedSignatureProof = types.AggregatedSignatureProof; const Root = types.Root; const ValidatorIndex = types.ValidatorIndex; @@ -256,6 +259,9 @@ pub const ForkChoice = struct { aggregated_payloads: AggregatedPayloadsMap, // Mutex to protect concurrent access to gossip_signatures and aggregated_payloads signatures_mutex: std.Thread.Mutex, + // Aggregator role and subnet tracking + is_aggregator: bool, + local_subnets: LocalSubnetSet, const Self = @This(); pub fn init(allocator: Allocator, opts: ForkChoiceParams) !Self { @@ -288,6 +294,7 @@ pub const ForkChoice = struct { const deltas = std.ArrayList(isize).init(allocator); const gossip_signatures = SignaturesMap.init(allocator); const aggregated_payloads = AggregatedPayloadsMap.init(allocator); + const local_subnets = LocalSubnetSet.initEmpty(); var fc = Self{ .allocator = allocator, @@ -303,6 +310,8 @@ pub const ForkChoice = struct { .gossip_signatures = gossip_signatures, .aggregated_payloads = aggregated_payloads, .signatures_mutex = .{}, + .is_aggregator = false, + .local_subnets = local_subnets, }; _ = try fc.updateHead(); return fc; @@ -313,7 +322,7 @@ pub const ForkChoice = struct { self.protoArray.indices.deinit(); self.attestations.deinit(); self.deltas.deinit(); - + self.signatures_mutex.lock(); defer self.signatures_mutex.unlock(); self.gossip_signatures.deinit(); @@ -329,6 +338,53 @@ pub const ForkChoice = struct { self.aggregated_payloads.deinit(); } + fn computeSubnetId(validator_id: usize) usize { + if (ATTESTATION_COMMITTEE_COUNT == 0) return 0; + return validator_id % ATTESTATION_COMMITTEE_COUNT; + } + + pub fn setAggregatorInfo(self: *Self, is_aggregator: bool, validator_ids: []usize) !void { + self.is_aggregator = is_aggregator; + + // Clear existing subnet assignments + self.local_subnets.setRangeValue(.{ .start = 0, .end = ATTESTATION_COMMITTEE_COUNT }, false); + + if (!is_aggregator) return; + if (ATTESTATION_COMMITTEE_COUNT == 0) return; + + for (validator_ids) |validator_id| { + const subnet_id = computeSubnetId(validator_id); + self.local_subnets.set(subnet_id); + } + } + + pub fn shouldStoreCommitteeSignature(self: *Self, validator_id: types.ValidatorIndex) bool { + if (!self.is_aggregator) return false; + if (ATTESTATION_COMMITTEE_COUNT == 0) return false; + const subnet_id = computeSubnetId(@intCast(validator_id)); + return self.local_subnets.isSet(subnet_id); + } + + pub fn getLatestNewAttestations(self: *Self) ![]types.Attestation { + var included_attestations = std.ArrayList(types.Attestation).init(self.allocator); + errdefer included_attestations.deinit(); + + for (0..self.config.genesis.numValidators()) |validator_id| { + const attestation_data = ((self.attestations.get(validator_id) orelse AttestationTracker{}) + .latestNew orelse ProtoAttestation{}).attestation_data; + + if (attestation_data) |att_data| { + const attestation = types.Attestation{ + .data = att_data, + .validator_id = @intCast(validator_id), + }; + try included_attestations.append(attestation); + } + } + + return included_attestations.toOwnedSlice(); + } + fn isBlockTimely(self: *Self, blockDelayMs: usize) bool { _ = self; _ = blockDelayMs; @@ -653,10 +709,11 @@ pub const ForkChoice = struct { } }, 1 => {}, - 2 => { + 2 => {}, + 3 => { _ = try self.updateSafeTarget(); }, - 3 => { + 4 => { _ = try self.acceptNewAttestations(); }, else => @panic("invalid interval"), @@ -685,48 +742,6 @@ pub const ForkChoice = struct { return self.updateHead(); } - pub fn getProposalHead(self: *Self, slot: types.Slot) !types.Checkpoint { - const time_intervals = slot * constants.INTERVALS_PER_SLOT; - // this could be called independently by the validator when its a separate process - // and FC would need to be protected by mutex to make it thread safe but for now - // this is deterministally called after the fc has been ticked ahead - // so the following call should be a no-op - try self.onInterval(time_intervals, true); - // accept any new attestations in case previous ontick was a no-op and either the validator - // wasn't registered or there have been new attestations - const head = try self.acceptNewAttestations(); - - return types.Checkpoint{ - .root = head.blockRoot, - .slot = head.slot, - }; - } - - pub fn getProposalAttestations(self: *Self) ![]types.Attestation { - var included_attestations = std.ArrayList(types.Attestation).init(self.allocator); - const latest_justified = self.fcStore.latest_justified; - - // TODO naive strategy to include all attestations that are consistent with the latest justified - // replace by the other mini 3sf simple strategy to loop and see if justification happens and - // till no further attestations can be added - for (0..self.config.genesis.numValidators()) |validator_id| { - const attestation_data = ((self.attestations.get(validator_id) orelse AttestationTracker{}) - // - .latestKnown orelse ProtoAttestation{}).attestation_data; - - if (attestation_data) |att_data| { - if (std.mem.eql(u8, &latest_justified.root, &att_data.source.root)) { - const attestation = types.Attestation{ - .data = att_data, - .validator_id = @intCast(validator_id), - }; - try included_attestations.append(attestation); - } - } - } - return included_attestations.toOwnedSlice(); - } - pub fn getAttestationTarget(self: *Self) !types.Checkpoint { var target_idx = self.protoArray.indices.get(self.head.blockRoot) orelse return ForkChoiceError.InvalidHeadIndex; const nodes = self.protoArray.nodes.items; @@ -900,18 +915,20 @@ pub const ForkChoice = struct { const validator_id = signed_attestation.validator_id; const attestation_slot = attestation_data.slot; - // Store the gossip signature for later lookup during block building - const data_root = try attestation_data.sszRoot(self.allocator); - const sig_key = SignatureKey{ - .validator_id = validator_id, - .data_root = data_root, - }; - self.signatures_mutex.lock(); - defer self.signatures_mutex.unlock(); - try self.gossip_signatures.put(sig_key, .{ - .slot = attestation_slot, - .signature = signed_attestation.signature, - }); + // Store the gossip signature for committee aggregation if applicable + if (self.shouldStoreCommitteeSignature(validator_id)) { + const data_root = try attestation_data.sszRoot(self.allocator); + const sig_key = SignatureKey{ + .validator_id = validator_id, + .data_root = data_root, + }; + self.signatures_mutex.lock(); + defer self.signatures_mutex.unlock(); + try self.gossip_signatures.put(sig_key, .{ + .slot = attestation_slot, + .signature = signed_attestation.signature, + }); + } const attestation = types.Attestation{ .data = attestation_data, @@ -966,39 +983,65 @@ pub const ForkChoice = struct { try self.attestations.put(validator_id, attestation_tracker); } - /// Store an aggregated signature proof for a validator from a block. + /// Store an aggregated signature proof for a validator. /// This allows future block builders to reuse this aggregation. + /// + /// Storage model: + /// - Index by (validator_id, data_root) so any participant can retrieve proofs. + /// - Keep both attestation_slot and source_slot: + /// * attestation_slot is the slot of the attested data (used for pruning). + /// * source_slot is when the proof was learned (block slot or local time), used for tie-breaks. + /// - Maintain each list ordered by source_slot (most recent first) for deterministic selection. pub fn storeAggregatedPayload( self: *Self, validator_id: types.ValidatorIndex, + source_slot: types.Slot, attestation_data: *const types.AttestationData, proof: types.AggregatedSignatureProof, ) !void { + self.signatures_mutex.lock(); + defer self.signatures_mutex.unlock(); const data_root = try attestation_data.sszRoot(self.allocator); const sig_key = SignatureKey{ .validator_id = validator_id, .data_root = data_root, }; - self.signatures_mutex.lock(); - defer self.signatures_mutex.unlock(); - // Get or create the list for this key + // Get or create the list for this key. const gop = try self.aggregated_payloads.getOrPut(sig_key); if (!gop.found_existing) { gop.value_ptr.* = AggregatedPayloadsList.init(self.allocator); } - try gop.value_ptr.append(.{ - .slot = attestation_data.slot, + const payload = StoredAggregatedPayload{ + .attestation_slot = attestation_data.slot, + .source_slot = source_slot, .proof = proof, - }); + }; + + // Keep proofs ordered by source slot (most recent first) for deterministic tie-breaking. + var insert_idx: usize = gop.value_ptr.items.len; + for (gop.value_ptr.items, 0..) |item, idx| { + if (item.source_slot <= payload.source_slot) { + insert_idx = idx; + break; + } + } + + if (insert_idx == gop.value_ptr.items.len) { + try gop.value_ptr.append(payload); + } else { + try gop.value_ptr.insert(insert_idx, payload); + } } /// Prune gossip_signatures and aggregated_payloads for attestations at or before the finalized slot. /// This is called after finalization to clean up signature data that is no longer needed. + /// + /// Note: We prune by attestation_slot (not source_slot). Once the data is finalized, + /// any proofs for it are irrelevant even if learned later. pub fn pruneSignatureMaps(self: *Self, finalized_slot: types.Slot) !void { self.signatures_mutex.lock(); defer self.signatures_mutex.unlock(); - var gossip_keys_to_remove = std.ArrayList(SignatureKey).init(self.allocator); defer gossip_keys_to_remove.deinit(); @@ -1030,7 +1073,7 @@ pub const ForkChoice = struct { var removed_here: usize = 0; for (list.items) |*stored| { - if (stored.slot <= finalized_slot) { + if (stored.attestation_slot <= finalized_slot) { stored.proof.deinit(); removed_here += 1; } else { @@ -1244,6 +1287,66 @@ fn createTestProtoBlock(slot: types.Slot, block_root_byte: u8, parent_root_byte: }; } +test "storeAggregatedPayload orders proofs by most recent" { + var arena_allocator = std.heap.ArenaAllocator.init(std.testing.allocator); + defer arena_allocator.deinit(); + const allocator = arena_allocator.allocator(); + + const mock_chain = try stf.genMockChain(allocator, 1, null); + const spec_name = try allocator.dupe(u8, "beamdev"); + const chain_config = configs.ChainConfig{ + .id = configs.Chain.custom, + .genesis = mock_chain.genesis_config, + .spec = .{ + .preset = params.Preset.mainnet, + .name = spec_name, + }, + }; + var beam_state = mock_chain.genesis_state; + var zeam_logger_config = zeam_utils.getTestLoggerConfig(); + const module_logger = zeam_logger_config.logger(.forkchoice); + + var fork_choice = try ForkChoice.init(allocator, .{ + .config = chain_config, + .anchorState = &beam_state, + .logger = module_logger, + }); + defer fork_choice.deinit(); + + const att_data = types.AttestationData{ + .slot = 5, + .head = .{ .root = types.ZERO_HASH, .slot = 5 }, + .target = .{ .root = types.ZERO_HASH, .slot = 5 }, + .source = .{ .root = types.ZERO_HASH, .slot = 0 }, + }; + + var proof_old = try AggregatedSignatureProof.init(allocator); + var proof_old_moved = false; + errdefer if (!proof_old_moved) proof_old.deinit(); + try types.aggregationBitsSet(&proof_old.participants, 0, true); + try types.aggregationBitsSet(&proof_old.participants, 1, false); + + var proof_new = try AggregatedSignatureProof.init(allocator); + var proof_new_moved = false; + errdefer if (!proof_new_moved) proof_new.deinit(); + try types.aggregationBitsSet(&proof_new.participants, 0, true); + try types.aggregationBitsSet(&proof_new.participants, 1, true); + + try fork_choice.storeAggregatedPayload(0, att_data.slot, &att_data, proof_old); + proof_old_moved = true; + try fork_choice.storeAggregatedPayload(0, att_data.slot, &att_data, proof_new); + proof_new_moved = true; + + const data_root = try att_data.sszRoot(allocator); + const list_opt = fork_choice.aggregated_payloads.get(.{ .validator_id = 0, .data_root = data_root }); + try std.testing.expect(list_opt != null); + const list = list_opt.?; + try std.testing.expectEqual(@as(usize, 2), list.items.len); + + try std.testing.expect(try list.items[0].proof.participants.get(1)); + try std.testing.expect(!(try list.items[1].proof.participants.get(1))); +} + test "getCanonicalAncestorAtDepth and getCanonicalityAnalysis" { // ============================================================================ // COMPREHENSIVE TEST TREE @@ -1354,6 +1457,9 @@ test "getCanonicalAncestorAtDepth and getCanonicalityAnalysis" { .logger = module_logger, .gossip_signatures = SignaturesMap.init(allocator), .aggregated_payloads = AggregatedPayloadsMap.init(allocator), + .signatures_mutex = .{}, + .is_aggregator = false, + .local_subnets = LocalSubnetSet.initEmpty(), }; defer fork_choice.attestations.deinit(); defer fork_choice.deltas.deinit(); @@ -1670,6 +1776,9 @@ fn buildTestTreeWithMockChain(allocator: Allocator, mock_chain: anytype) !struct .logger = module_logger, .gossip_signatures = SignaturesMap.init(allocator), .aggregated_payloads = AggregatedPayloadsMap.init(allocator), + .signatures_mutex = .{}, + .is_aggregator = false, + .local_subnets = LocalSubnetSet.initEmpty(), }; return .{ @@ -2616,6 +2725,9 @@ test "rebase: heavy attestation load - all validators tracked correctly" { .logger = module_logger, .gossip_signatures = SignaturesMap.init(allocator), .aggregated_payloads = AggregatedPayloadsMap.init(allocator), + .signatures_mutex = .{}, + .is_aggregator = false, + .local_subnets = LocalSubnetSet.initEmpty(), }; // Note: We don't defer proto_array.nodes/indices.deinit() here because they're // moved into fork_choice and will be deinitialized separately diff --git a/pkgs/node/src/lib.zig b/pkgs/node/src/lib.zig index bd1457d7..897cf833 100644 --- a/pkgs/node/src/lib.zig +++ b/pkgs/node/src/lib.zig @@ -3,6 +3,7 @@ pub const Clock = clockFactory.Clock; const nodeFactory = @import("./node.zig"); pub const BeamNode = nodeFactory.BeamNode; +pub const buildGossipTopicSpecs = nodeFactory.buildGossipTopicSpecs; pub const chainFactory = @import("./chain.zig"); pub const fcFactory = @import("./forkchoice.zig"); diff --git a/pkgs/node/src/network.zig b/pkgs/node/src/network.zig index 8670fb3d..5afc10dc 100644 --- a/pkgs/node/src/network.zig +++ b/pkgs/node/src/network.zig @@ -129,8 +129,8 @@ pub const Network = struct { self.allocator.destroy(self.connected_peers); } - pub fn publish(self: *Self, data: *const networks.GossipMessage) !void { - return self.backend.gossip.publish(data); + pub fn publishWithTopic(self: *Self, topic: networks.GossipTopicSpec, data: *const networks.GossipMessage) !void { + return self.backend.gossip.publishWithTopic(topic, data); } pub fn sendStatus( diff --git a/pkgs/node/src/node.zig b/pkgs/node/src/node.zig index 9695fe9b..8a259877 100644 --- a/pkgs/node/src/node.zig +++ b/pkgs/node/src/node.zig @@ -27,6 +27,7 @@ const BlockByRootContext = networkFactory.BlockByRootContext; pub const NodeNameRegistry = networks.NodeNameRegistry; const ZERO_HASH = types.ZERO_HASH; +const ATTESTATION_COMMITTEE_COUNT: usize = @intCast(params.ATTESTATION_COMMITTEE_COUNT); const NodeOpts = struct { config: configs.ChainConfig, @@ -39,6 +40,7 @@ const NodeOpts = struct { db: database.Db, logger_config: *zeam_utils.ZeamLoggerConfig, node_registry: *const NodeNameRegistry, + is_aggregator: bool = false, }; pub const BeamNode = struct { @@ -91,6 +93,7 @@ pub const BeamNode = struct { }); chain.registerValidatorIds(ids); } + chain.setAggregator(opts.is_aggregator); self.* = Self{ .allocator = allocator, @@ -210,6 +213,15 @@ pub const BeamNode = struct { sender_node_name, }); }, + .aggregation => |signed_aggregation| { + const slot = signed_aggregation.data.slot; + const sender_node_name = self.node_registry.getNodeNameFromPeerId(sender_peer_id); + self.logger.info("received gossip aggregated attestation for slot={d} from peer={s}{}", .{ + slot, + sender_peer_id, + sender_node_name, + }); + }, } const result = self.chain.onGossip(data, sender_peer_id) catch |err| { @@ -859,6 +871,7 @@ pub const BeamNode = struct { return; } const interval: usize = @intCast(itime_intervals); + const interval_in_slot = interval % constants.INTERVALS_PER_SLOT; self.chain.onInterval(interval) catch |e| { self.logger.err("error ticking chain to time(intervals)={d} err={any}", .{ interval, e }); @@ -891,10 +904,36 @@ pub const BeamNode = struct { return e; }; }, + .aggregation => |signed_aggregation| { + self.publishAggregatedAttestation(signed_aggregation) catch |e| { + self.logger.err("error publishing aggregated attestation from validator: err={any}", .{e}); + return e; + }; + }, } } } } + + if (interval_in_slot == 2 and self.chain.is_aggregator) { + var aggregated = self.chain.aggregateCommitteeSignatures() catch |e| { + self.logger.err("error aggregating committee signatures at interval={d}: {any}", .{ interval, e }); + return e; + }; + defer { + for (aggregated.items) |*msg| { + msg.deinit(); + } + aggregated.deinit(); + } + + for (aggregated.items) |signed_aggregated| { + self.publishAggregatedAttestation(signed_aggregated) catch |e| { + self.logger.err("error publishing aggregated attestation: err={any}", .{e}); + return e; + }; + } + } } pub fn publishBlock(self: *Self, signed_block: types.SignedBlockWithAttestation) !void { @@ -930,7 +969,7 @@ pub const BeamNode = struct { // 2. publish gossip message const gossip_msg = networks.GossipMessage{ .block = signed_block }; - try self.network.publish(&gossip_msg); + try self.network.publishWithTopic(.{ .topic = .block }, &gossip_msg); self.logger.info("published block to network: slot={d} proposer={d}{}", .{ block.slot, block.proposer_index, @@ -954,7 +993,16 @@ pub const BeamNode = struct { // 2. publish gossip message const gossip_msg = networks.GossipMessage{ .attestation = signed_attestation }; - try self.network.publish(&gossip_msg); + comptime { + if (ATTESTATION_COMMITTEE_COUNT == 0) { + @compileError("ATTESTATION_COMMITTEE_COUNT must be > 0"); + } + } + const subnet_id = + @as(usize, @intCast(validator_id)) % ATTESTATION_COMMITTEE_COUNT; + const subnet_topic = networks.GossipTopicSpec{ .topic = .attestation, .subnet_id = subnet_id }; + try self.network.publishWithTopic(subnet_topic, &gossip_msg); + try self.network.publishWithTopic(.{ .topic = .attestation }, &gossip_msg); self.logger.info("published attestation to network: slot={d} validator={d}{}", .{ data.slot, @@ -963,6 +1011,12 @@ pub const BeamNode = struct { }); } + pub fn publishAggregatedAttestation(self: *Self, signed_attestation: types.SignedAggregatedAttestation) !void { + const gossip_msg = networks.GossipMessage{ .aggregation = signed_attestation }; + try self.network.publishWithTopic(.{ .topic = .aggregation }, &gossip_msg); + self.logger.info("published aggregated attestation to network: slot={d}", .{signed_attestation.data.slot}); + } + pub fn run(self: *Self) !void { // Catch up fork choice time to current interval before processing any requests. // This prevents FutureSlot errors when receiving blocks via RPC immediately after starting. @@ -973,8 +1027,10 @@ pub const BeamNode = struct { } const handler = try self.getOnGossipCbHandler(); - var topics = [_]networks.GossipTopic{ .block, .attestation }; - try self.network.backend.gossip.subscribe(&topics, handler); + const validator_ids = if (self.validator) |*validator| validator.ids else null; + const topic_specs = try buildGossipTopicSpecs(self.allocator, self.chain.config, validator_ids, self.chain.is_aggregator); + defer self.allocator.free(topic_specs); + try self.network.backend.gossip.subscribe(topic_specs, handler); const peer_handler = self.getPeerEventHandler(); try self.network.backend.peers.subscribe(peer_handler); @@ -987,6 +1043,41 @@ pub const BeamNode = struct { } }; +pub fn buildGossipTopicSpecs( + allocator: Allocator, + config: configs.ChainConfig, + validator_ids: ?[]const usize, + is_aggregator: bool, +) ![]networks.GossipTopicSpec { + _ = config; + var topics = std.ArrayList(networks.GossipTopicSpec).init(allocator); + errdefer topics.deinit(); + + try topics.append(.{ .topic = .block }); + try topics.append(.{ .topic = .attestation }); + try topics.append(.{ .topic = .aggregation }); + + if (is_aggregator) { + if (validator_ids) |ids| { + if (ATTESTATION_COMMITTEE_COUNT == 0) return topics.toOwnedSlice(); + const LocalSubnetSet = std.StaticBitSet(ATTESTATION_COMMITTEE_COUNT); + var local_subnets = LocalSubnetSet.initEmpty(); + + for (ids) |validator_id| { + const subnet_id = validator_id % ATTESTATION_COMMITTEE_COUNT; + local_subnets.set(subnet_id); + } + + var subnet_it = local_subnets.iterator(.{}); + while (subnet_it.next()) |subnet_id| { + try topics.append(.{ .topic = .attestation, .subnet_id = subnet_id }); + } + } + } + + return topics.toOwnedSlice(); +} + const xev = @import("xev"); test "Node peer tracking on connect/disconnect" { diff --git a/pkgs/node/src/validator_client.zig b/pkgs/node/src/validator_client.zig index 59df2c83..5db55467 100644 --- a/pkgs/node/src/validator_client.zig +++ b/pkgs/node/src/validator_client.zig @@ -80,6 +80,7 @@ pub const ValidatorClient = struct { 1 => return self.mayBeDoAttestation(slot), 2 => return null, 3 => return null, + 4 => return null, else => @panic("interval error"), } } diff --git a/pkgs/params/src/lib.zig b/pkgs/params/src/lib.zig index ebee3243..511515b6 100644 --- a/pkgs/params/src/lib.zig +++ b/pkgs/params/src/lib.zig @@ -17,6 +17,7 @@ pub const SECONDS_PER_SLOT = activePresetValues.SECONDS_PER_SLOT; pub const HISTORICAL_ROOTS_LIMIT = activePresetValues.HISTORICAL_ROOTS_LIMIT; pub const VALIDATOR_REGISTRY_LIMIT = activePresetValues.VALIDATOR_REGISTRY_LIMIT; pub const MAX_REQUEST_BLOCKS = activePresetValues.MAX_REQUEST_BLOCKS; +pub const ATTESTATION_COMMITTEE_COUNT = activePresetValues.ATTESTATION_COMMITTEE_COUNT; test "test preset loading" { try std.testing.expect(SECONDS_PER_SLOT == mainnetPreset.preset.SECONDS_PER_SLOT); diff --git a/pkgs/params/src/presets/mainnet.zig b/pkgs/params/src/presets/mainnet.zig index 7c514c9c..7028acfd 100644 --- a/pkgs/params/src/presets/mainnet.zig +++ b/pkgs/params/src/presets/mainnet.zig @@ -7,4 +7,5 @@ pub const preset = types.PresetConfig{ .HISTORICAL_ROOTS_LIMIT = 1 << 18, // 2^18 = 262144 .VALIDATOR_REGISTRY_LIMIT = 1 << 12, // 2^12 = 4096 .MAX_REQUEST_BLOCKS = 1024, + .ATTESTATION_COMMITTEE_COUNT = 1, }; diff --git a/pkgs/params/src/types.zig b/pkgs/params/src/types.zig index e471a7b0..467ac6cf 100644 --- a/pkgs/params/src/types.zig +++ b/pkgs/params/src/types.zig @@ -5,4 +5,5 @@ pub const PresetConfig = struct { HISTORICAL_ROOTS_LIMIT: u32, VALIDATOR_REGISTRY_LIMIT: u32, MAX_REQUEST_BLOCKS: u32, + ATTESTATION_COMMITTEE_COUNT: u32, }; diff --git a/pkgs/state-transition/src/mock.zig b/pkgs/state-transition/src/mock.zig index 034bd5bd..1b30322d 100644 --- a/pkgs/state-transition/src/mock.zig +++ b/pkgs/state-transition/src/mock.zig @@ -294,7 +294,7 @@ pub fn genMockChain(allocator: Allocator, numBlocks: usize, from_genesis: ?types ); } - // Compute aggregated signatures using the shared method + // Aggregate gossip signatures into proofs for block production var aggregation = try types.AggregatedAttestationsResult.init(allocator); var agg_att_cleanup = true; var agg_sig_cleanup = true; @@ -310,11 +310,10 @@ pub fn genMockChain(allocator: Allocator, numBlocks: usize, from_genesis: ?types } aggregation.attestation_signatures.deinit(); }; - try aggregation.computeAggregatedSignatures( + try aggregation.aggregateGossipSignatures( attestations.items, &beam_state.validators, &signatures_map, - null, // no pre-aggregated payloads in mock ); const proposer_index = slot % genesis_config.numValidators(); diff --git a/pkgs/types/src/attestation.zig b/pkgs/types/src/attestation.zig index 94e026cd..b6023ecc 100644 --- a/pkgs/types/src/attestation.zig +++ b/pkgs/types/src/attestation.zig @@ -5,6 +5,7 @@ const params = @import("@zeam/params"); const zeam_utils = @import("@zeam/utils"); const mini_3sf = @import("./mini_3sf.zig"); +const aggregation = @import("./aggregation.zig"); const utils = @import("./utils.zig"); const Allocator = std.mem.Allocator; @@ -143,6 +144,28 @@ pub const AggregatedAttestation = struct { } }; +pub const SignedAggregatedAttestation = struct { + data: AttestationData, + proof: aggregation.AggregatedSignatureProof, + + pub fn deinit(self: *SignedAggregatedAttestation) void { + self.proof.deinit(); + } + + pub fn toJson(self: *const SignedAggregatedAttestation, allocator: Allocator) !json.Value { + var obj = json.ObjectMap.init(allocator); + try obj.put("data", try self.data.toJson(allocator)); + try obj.put("proof", try self.proof.toJson(allocator)); + return json.Value{ .object = obj }; + } + + pub fn toJsonString(self: *const SignedAggregatedAttestation, allocator: Allocator) ![]const u8 { + var json_value = try self.toJson(allocator); + defer freeJsonValue(&json_value, allocator); + return utils.jsonToString(allocator, json_value); + } +}; + pub fn aggregationBitsEnsureLength(bits: *AggregationBits, target_len: usize) !void { while (bits.len() < target_len) { try bits.append(false); diff --git a/pkgs/types/src/block.zig b/pkgs/types/src/block.zig index 0fb43e20..c0116ddf 100644 --- a/pkgs/types/src/block.zig +++ b/pkgs/types/src/block.zig @@ -50,7 +50,10 @@ pub const SignaturesMap = std.AutoHashMap(SignatureKey, StoredSignature); /// Stored aggregated payload entry pub const StoredAggregatedPayload = struct { - slot: Slot, + /// Slot of the attested data (used for pruning). + attestation_slot: Slot, + /// Slot when the proof was learned (used for tie-break ordering). + source_slot: Slot, proof: aggregation.AggregatedSignatureProof, }; @@ -309,6 +312,54 @@ pub const AggregatedAttestationsResult = struct { allocator: Allocator, const Self = @This(); + const AttestationGroup = struct { + data: attestation.AttestationData, + data_root: Root, + validator_bits: std.DynamicBitSet, + + fn deinit(self: *AttestationGroup) void { + self.validator_bits.deinit(); + } + }; + + const CollectedGossipSignatures = struct { + signatures: std.ArrayList(xmss.Signature), + public_keys: std.ArrayList(xmss.PublicKey), + participants: attestation.AggregationBits, + participants_owned: bool, + + fn init(allocator: Allocator) !CollectedGossipSignatures { + var signatures = std.ArrayList(xmss.Signature).init(allocator); + errdefer signatures.deinit(); + var public_keys = std.ArrayList(xmss.PublicKey).init(allocator); + errdefer public_keys.deinit(); + var participants = try attestation.AggregationBits.init(allocator); + errdefer participants.deinit(); + + return .{ + .signatures = signatures, + .public_keys = public_keys, + .participants = participants, + .participants_owned = true, + }; + } + + fn deinit(self: *CollectedGossipSignatures) void { + for (self.signatures.items) |*sig| { + sig.deinit(); + } + self.signatures.deinit(); + + for (self.public_keys.items) |*pk| { + pk.deinit(); + } + self.public_keys.deinit(); + + if (self.participants_owned) { + self.participants.deinit(); + } + } + }; pub fn init(allocator: Allocator) !Self { var attestations_list = try AggregatedAttestations.init(allocator); @@ -324,48 +375,45 @@ pub const AggregatedAttestationsResult = struct { }; } - /// Compute aggregated signatures using three-phase algorithm: - /// Phase 1: Collect individual signatures from signatures_map (chain: gossip_signatures) - /// Phase 2: Fallback to aggregated_payloads using greedy set-cover (if provided) - /// Phase 3: Remove signatures which are already coverd by stored prrofs and aggregate remaining signatures - pub fn computeAggregatedSignatures( - self: *Self, - attestations_list: []const Attestation, - validators: *const Validators, - signatures_map: *const SignaturesMap, - aggregated_payloads: ?*const AggregatedPayloadsMap, - ) !void { - const allocator = self.allocator; + fn ensureAttestationAppendCapacity(self: *Self) !void { + const max = params.VALIDATOR_REGISTRY_LIMIT; + if (self.attestations.len() >= max or self.attestation_signatures.len() >= max) { + return error.Overflow; + } - // Group attestations by data root using bitsets for validator tracking - const AttestationGroup = struct { - data: attestation.AttestationData, - data_root: Root, - validator_bits: std.DynamicBitSet, - }; + try self.attestations.inner.ensureTotalCapacity(self.attestations.len() + 1); + try self.attestation_signatures.inner.ensureTotalCapacity(self.attestation_signatures.len() + 1); + } + fn buildAttestationGroups( + allocator: Allocator, + attestations_list: []const Attestation, + ) !std.ArrayList(AttestationGroup) { var groups = std.ArrayList(AttestationGroup).init(allocator); - defer { + errdefer { for (groups.items) |*group| { - group.validator_bits.deinit(); + group.deinit(); } groups.deinit(); } + // Map data_root -> index into groups so we can update existing bitsets in O(1). var root_indices = std.AutoHashMap(Root, usize).init(allocator); defer root_indices.deinit(); - // Group attestations by data root + // Group attestations by data_root and collect validator ids into bitsets. for (attestations_list) |att| { const data_root = try att.data.sszRoot(allocator); const vid: usize = @intCast(att.validator_id); if (root_indices.get(data_root)) |group_index| { var bits = &groups.items[group_index].validator_bits; + // Grow the bitset to fit this validator id if needed. if (vid >= bits.capacity()) { try bits.resize(vid + 1, false); } bits.set(vid); } else { + // First time seeing this data_root: seed the bitset with this validator id. var new_bits = try std.DynamicBitSet.initEmpty(allocator, vid + 1); new_bits.set(vid); try groups.append(.{ @@ -377,247 +425,309 @@ pub const AggregatedAttestationsResult = struct { } } - // Process each group + return groups; + } + + fn collectGossipSignaturesForGroup( + allocator: Allocator, + group: *const AttestationGroup, + validators: *const Validators, + signatures_map: *const SignaturesMap, + ) !?CollectedGossipSignatures { + var collected = try CollectedGossipSignatures.init(allocator); + errdefer collected.deinit(); + + // Collect all available signatures for the group's data_root. + var validator_it = group.validator_bits.iterator(.{}); + while (validator_it.next()) |validator_id| { + const vid: ValidatorIndex = @intCast(validator_id); + // Each validator contributes at most one signature per (validator_id, data_root). + const sig_entry = signatures_map.get(.{ .validator_id = vid, .data_root = group.data_root }) orelse { + continue; + }; + + // Skip missing signatures that are explicitly zeroed. + if (std.mem.eql(u8, &sig_entry.signature, &ZERO_SIGBYTES)) continue; + + // Deserialize signature and fetch the corresponding public key. + var sig = xmss.Signature.fromBytes(&sig_entry.signature) catch { + continue; + }; + + if (validator_id >= validators.len()) { + sig.deinit(); + continue; + } + + const val = validators.get(validator_id) catch { + sig.deinit(); + continue; + }; + + // Parse the validator's public key so we can aggregate with XMSS. + var pk = xmss.PublicKey.fromBytes(&val.pubkey) catch { + sig.deinit(); + continue; + }; + + collected.signatures.append(sig) catch |e| { + sig.deinit(); + pk.deinit(); + return e; + }; + collected.public_keys.append(pk) catch |e| { + pk.deinit(); + return e; + }; + + // Track participant membership in the aggregated proof. + try attestation.aggregationBitsSet(&collected.participants, validator_id, true); + } + + if (collected.signatures.items.len == 0) { + collected.deinit(); + return null; + } + + return collected; + } + + fn appendAggregatedProofForGroup( + self: *Self, + group: *const AttestationGroup, + message_hash: *const [32]u8, + collected: *CollectedGossipSignatures, + ) !void { + const allocator = self.allocator; + const epoch: u64 = group.data.slot; + + // Convert signatures and public keys into the handle arrays expected by XMSS. + var pk_handles = try allocator.alloc(*const xmss.HashSigPublicKey, collected.public_keys.items.len); + defer allocator.free(pk_handles); + var sig_handles = try allocator.alloc(*const xmss.HashSigSignature, collected.signatures.items.len); + defer allocator.free(sig_handles); + + // Stable ordering of handles follows the collected lists (validator iteration order). + for (collected.public_keys.items, 0..) |*pk, i| pk_handles[i] = pk.handle; + for (collected.signatures.items, 0..) |*sig, i| sig_handles[i] = sig.handle; + + // Aggregate into a single proof. This transfers ownership of participants to the proof. + var proof = try aggregation.AggregatedSignatureProof.init(allocator); + errdefer proof.deinit(); + try aggregation.AggregatedSignatureProof.aggregate( + collected.participants, + pk_handles, + sig_handles, + message_hash, + epoch, + &proof, + ); + collected.participants_owned = false; + + // Clone the participants bitlist for the aggregated attestation record. + var att_bits = try attestation.AggregationBits.init(allocator); + errdefer att_bits.deinit(); + // We keep att_bits in the block body while proof keeps its own participants list. + for (0..proof.participants.len()) |i| { + if (proof.participants.get(i) catch false) { + try attestation.aggregationBitsSet(&att_bits, i, true); + } + } + + try self.ensureAttestationAppendCapacity(); + self.attestations.inner.appendAssumeCapacity(.{ .aggregation_bits = att_bits, .data = group.data }); + self.attestation_signatures.inner.appendAssumeCapacity(proof); + } + + /// Aggregate individual gossip signatures into proofs (used by committee aggregators). + pub fn aggregateGossipSignatures( + self: *Self, + attestations_list: []const Attestation, + validators: *const Validators, + signatures_map: *const SignaturesMap, + ) !void { + const allocator = self.allocator; + + // Algorithm overview (group -> collect -> aggregate): + // 1) Group attestations by data_root and collect validator ids into a bitset per group. + // 2) For each group, pull any available per-validator signatures from signatures_map. + // 3) If we collected at least one signature, aggregate them into a single proof and + // emit a matching AggregatedAttestation for block inclusion/gossip. + + var groups = try buildAttestationGroups(allocator, attestations_list); + defer { + for (groups.items) |*group| { + group.deinit(); + } + groups.deinit(); + } + for (groups.items) |*group| { - const data_root = group.data_root; - const epoch: u64 = group.data.slot; var message_hash: [32]u8 = undefined; try zeam_utils.hashTreeRoot(attestation.AttestationData, group.data, &message_hash, allocator); - // Phase 1: Collect signatures from signatures_map - const max_validator = group.validator_bits.capacity(); + const collected_opt = try collectGossipSignaturesForGroup( + allocator, + group, + validators, + signatures_map, + ); + if (collected_opt == null) continue; + + var collected = collected_opt.?; + defer collected.deinit(); + try self.appendAggregatedProofForGroup(group, &message_hash, &collected); + } + } + + fn initRemainingValidators( + allocator: Allocator, + group: *const AttestationGroup, + ) !std.DynamicBitSet { + const max_validator = group.validator_bits.capacity(); + var remaining = try std.DynamicBitSet.initEmpty(allocator, max_validator); + + var init_it = group.validator_bits.iterator(.{}); + while (init_it.next()) |validator_id| { + if (validator_id >= remaining.capacity()) { + try remaining.resize(validator_id + 1, false); + } + remaining.set(validator_id); + } + + // "remaining" starts as the full participant set for this data_root. + return remaining; + } - var sigmap_sigs = std.ArrayList(xmss.Signature).init(allocator); - defer { - for (sigmap_sigs.items) |*sig| { - sig.deinit(); + fn selectBestProofForGroup( + remaining: *const std.DynamicBitSet, + remaining_count: usize, + candidates: *const AggregatedPayloadsList, + ) ?*const aggregation.AggregatedSignatureProof { + var best_proof: ?*const aggregation.AggregatedSignatureProof = null; + var max_coverage: usize = 0; + + // Choose the proof that covers the most remaining validators. + for (candidates.items) |*stored| { + const proof = &stored.proof; + const participants_len = proof.participants.len(); + + var coverage: usize = 0; + var remaining_it = remaining.iterator(.{}); + while (remaining_it.next()) |i| { + if (i >= participants_len) continue; + if (proof.participants.get(i) catch false) { + coverage += 1; } - sigmap_sigs.deinit(); } - var sigmap_pks = std.ArrayList(xmss.PublicKey).init(allocator); - defer { - for (sigmap_pks.items) |*pk| { - pk.deinit(); - } - sigmap_pks.deinit(); + if (coverage == 0) continue; + // Strictly greater keeps the first max-coverage proof (list order tie-break). + if (coverage > max_coverage) { + max_coverage = coverage; + best_proof = proof; + if (max_coverage == remaining_count) break; } + } - // Map from validator_id to index in signatures_map arrays - // Used to remove signatures from sigmap_sigs while aggregating which are already covered by stored proofs - var vid_to_sigmap_idx = try allocator.alloc(?usize, max_validator); - defer allocator.free(vid_to_sigmap_idx); - @memset(vid_to_sigmap_idx, null); + return best_proof; + } - // Bitsets for tracking validator states - var remaining = try std.DynamicBitSet.initEmpty(allocator, max_validator); - defer remaining.deinit(); + fn appendSelectedProofForGroup( + self: *Self, + group: *const AttestationGroup, + remaining: *std.DynamicBitSet, + remaining_count: *usize, + proof: *const aggregation.AggregatedSignatureProof, + ) !void { + const allocator = self.allocator; - var sigmap_available = try std.DynamicBitSet.initEmpty(allocator, max_validator); - defer sigmap_available.deinit(); - - // Track validators covered by stored proofs (to avoid redundancy with signatures_map) - var covered_by_stored = try std.DynamicBitSet.initEmpty(allocator, max_validator); - defer covered_by_stored.deinit(); - - // Attempt to collect each validator's signature from signatures_map - var validator_it = group.validator_bits.iterator(.{}); - while (validator_it.next()) |validator_id| { - const vid: ValidatorIndex = @intCast(validator_id); - if (signatures_map.get(.{ .validator_id = vid, .data_root = data_root })) |sig_entry| { - // Check if it's not a zero signature - if (!std.mem.eql(u8, &sig_entry.signature, &ZERO_SIGBYTES)) { - // Deserialize signature - var sig = xmss.Signature.fromBytes(&sig_entry.signature) catch { - remaining.set(validator_id); - continue; - }; - errdefer sig.deinit(); - - // Get public key from validator - if (validator_id >= validators.len()) { - sig.deinit(); - remaining.set(validator_id); - continue; - } - - const val = validators.get(validator_id) catch { - sig.deinit(); - remaining.set(validator_id); - continue; - }; - const pk = xmss.PublicKey.fromBytes(&val.pubkey) catch { - sig.deinit(); - remaining.set(validator_id); - continue; - }; - - vid_to_sigmap_idx[validator_id] = sigmap_sigs.items.len; - try sigmap_sigs.append(sig); - try sigmap_pks.append(pk); - sigmap_available.set(validator_id); - } else { - remaining.set(validator_id); - } - } else { - remaining.set(validator_id); + // Clone the stored proof so the block owns its copy. + var cloned_proof: aggregation.AggregatedSignatureProof = undefined; + try utils.sszClone(allocator, aggregation.AggregatedSignatureProof, proof.*, &cloned_proof); + errdefer cloned_proof.deinit(); + + var att_bits = try attestation.AggregationBits.init(allocator); + errdefer att_bits.deinit(); + + // Convert proof participants into aggregation bits and remove them from remaining. + for (0..cloned_proof.participants.len()) |i| { + if (cloned_proof.participants.get(i) catch false) { + try attestation.aggregationBitsSet(&att_bits, i, true); + if (i < remaining.capacity() and remaining.isSet(i)) { + remaining.unset(i); + remaining_count.* -= 1; } } + } - // Phase 2: Fallback to aggregated_payloads using greedy set-cover - if (aggregated_payloads) |agg_payloads| { - // Temporary bitset for computing coverage - var proof_bits = try std.DynamicBitSet.initEmpty(allocator, max_validator); - defer proof_bits.deinit(); + try self.ensureAttestationAppendCapacity(); + self.attestations.inner.appendAssumeCapacity(.{ .aggregation_bits = att_bits, .data = group.data }); + self.attestation_signatures.inner.appendAssumeCapacity(cloned_proof); + } + + /// Select aggregated proofs from stored payloads (used by proposers; no fallback). + pub fn selectAggregatedProofs( + self: *Self, + attestations_list: []const Attestation, + aggregated_payloads: ?*const AggregatedPayloadsMap, + ) !void { + const allocator = self.allocator; + + // Algorithm overview (greedy set cover with deterministic tie-break): + // 1) Group attestations by data_root and track participating validator ids in a bitset. + // 2) For each group, keep a "remaining" bitset of validators we still need to cover. + // 3) Repeatedly: + // a) Pick an arbitrary remaining validator (lowest set bit for determinism). + // b) Fetch candidate proofs indexed by (validator_id, data_root). + // c) Choose the proof that covers the most remaining validators. + // If coverage ties, keep the first proof in the list. + // The list is pre-ordered by source_slot (most recent first), + // so this tie-break is deterministic and prefers newer proofs. + // d) If the best proof has zero overlap, stop to avoid inconsistent results. + // e) Add the proof and remove its participants from remaining. + // Note: We do not aggregate fresh gossip signatures here. Only stored proofs are used. + + // Group attestations by data root using bitsets for validator tracking. + var groups = try buildAttestationGroups(allocator, attestations_list); + defer { + for (groups.items) |*group| { + group.deinit(); + } + groups.deinit(); + } + + // Process each group + for (groups.items) |*group| { + const data_root = group.data_root; + var remaining = try initRemainingValidators(allocator, group); + defer remaining.deinit(); + var remaining_count = remaining.count(); - while (remaining.count() > 0) { - // Pick any remaining validator to look up proofs + if (aggregated_payloads) |agg_payloads| { + while (remaining_count > 0) { + // Pick a deterministic target validator to drive lookup. const target_id = remaining.findFirstSet() orelse break; const vid: ValidatorIndex = @intCast(target_id); - // Remove the target_id from remaining if not covered by stored proofs + // Proofs are indexed by participant id and data root. const candidates = agg_payloads.get(.{ .validator_id = vid, .data_root = data_root }) orelse { remaining.unset(target_id); + remaining_count -= 1; continue; }; if (candidates.items.len == 0) { remaining.unset(target_id); + remaining_count -= 1; continue; } - // Find the proof covering the most remaining validators (greedy set-cover) - var best_proof: ?*const aggregation.AggregatedSignatureProof = null; - var max_coverage: usize = 0; - - for (candidates.items) |*stored| { - const proof = &stored.proof; - const max_participants = proof.participants.len(); - - // Reset and populate proof_bits from participants - proof_bits.setRangeValue(.{ .start = 0, .end = proof_bits.capacity() }, false); - if (max_participants > proof_bits.capacity()) { - try proof_bits.resize(max_participants, false); - } - - var coverage: usize = 0; - - for (0..max_participants) |i| { - if (proof.participants.get(i) catch false) { - // Count coverage of validators still in remaining (not yet covered by stored proofs) - if (i < remaining.capacity() and remaining.isSet(i)) { - proof_bits.set(i); - coverage += 1; - } - } - } - - if (coverage == 0) { - continue; - } - - if (coverage > max_coverage) { - max_coverage = coverage; - best_proof = proof; - } - } - - if (best_proof == null or max_coverage == 0) { - remaining.unset(target_id); - continue; - } - - // Clone and add the proof - var cloned_proof: aggregation.AggregatedSignatureProof = undefined; - try utils.sszClone(allocator, aggregation.AggregatedSignatureProof, best_proof.?.*, &cloned_proof); - errdefer cloned_proof.deinit(); - - // Create aggregated attestation matching the proof's participants - // and update tracking bitsets in a single pass - var att_bits = try attestation.AggregationBits.init(allocator); - errdefer att_bits.deinit(); - - for (0..cloned_proof.participants.len()) |i| { - if (cloned_proof.participants.get(i) catch false) { - try attestation.aggregationBitsSet(&att_bits, i, true); - if (i < remaining.capacity()) { - remaining.unset(i); - } - // Track ALL validators covered by stored proofs to remove from signatures_map later - if (i >= covered_by_stored.capacity()) { - try covered_by_stored.resize(i + 1, false); - } - covered_by_stored.set(i); - } - } - - try self.attestations.append(.{ .aggregation_bits = att_bits, .data = group.data }); - try self.attestation_signatures.append(cloned_proof); - } - } - - // Finally, aggregate signatures_map for validators NOT covered by stored proofs - // This avoids redundancy: if a validator is in a stored proof, don't include them in signatures_map aggregation - var usable_count: usize = 0; - var git = sigmap_available.iterator(.{}); - while (git.next()) |vid| { - if (vid >= covered_by_stored.capacity() or !covered_by_stored.isSet(vid)) { - usable_count += 1; - } - } - - if (usable_count > 0) { - var participants = try attestation.AggregationBits.init(allocator); - var participants_cleanup = true; - errdefer if (participants_cleanup) participants.deinit(); - - var pk_handles = try allocator.alloc(*const xmss.HashSigPublicKey, usable_count); - defer allocator.free(pk_handles); - var sig_handles = try allocator.alloc(*const xmss.HashSigSignature, usable_count); - defer allocator.free(sig_handles); - - // Iterate sigmap_available in order, skipping validators already in stored proofs - var handle_idx: usize = 0; - var git2 = sigmap_available.iterator(.{}); - while (git2.next()) |vid| { - // Skip if already covered by a stored proof - if (vid < covered_by_stored.capacity() and covered_by_stored.isSet(vid)) continue; - - try attestation.aggregationBitsSet(&participants, vid, true); - const sigmap_idx = vid_to_sigmap_idx[vid].?; - pk_handles[handle_idx] = sigmap_pks.items[sigmap_idx].handle; - sig_handles[handle_idx] = sigmap_sigs.items[sigmap_idx].handle; - handle_idx += 1; - } - - var proof = try aggregation.AggregatedSignatureProof.init(allocator); - errdefer proof.deinit(); - - try aggregation.AggregatedSignatureProof.aggregate( - participants, - pk_handles[0..handle_idx], - sig_handles[0..handle_idx], - &message_hash, - epoch, - &proof, - ); - participants_cleanup = false; // proof now owns participants buffer - - // Create aggregated attestation using proof's participants (which now owns the bits) - // We need to clone it since we're moving it into the attestation - var att_bits = try attestation.AggregationBits.init(allocator); - errdefer att_bits.deinit(); - - // Clone from proof.participants - const proof_participants_len = proof.participants.len(); - for (0..proof_participants_len) |i| { - if (proof.participants.get(i) catch false) { - try attestation.aggregationBitsSet(&att_bits, i, true); - } + const best_proof = selectBestProofForGroup(&remaining, remaining_count, &candidates) orelse { + break; + }; + try self.appendSelectedProofForGroup(group, &remaining, &remaining_count, best_proof); } - - try self.attestations.append(.{ .aggregation_bits = att_bits, .data = group.data }); - try self.attestation_signatures.append(proof); } } } @@ -873,3 +983,74 @@ test "encode decode signed block with non-empty attestation signatures" { const decoded_group = try decoded.signature.attestation_signatures.get(0); try std.testing.expect(decoded_group.participants.len() == 2); } + +test "selectAggregatedProofs: tie-break uses list order" { + const allocator = std.testing.allocator; + + const att_data = attestation.AttestationData{ + .slot = 5, + .head = .{ .root = ZERO_HASH, .slot = 5 }, + .target = .{ .root = ZERO_HASH, .slot = 5 }, + .source = .{ .root = ZERO_HASH, .slot = 0 }, + }; + const data_root = try att_data.sszRoot(allocator); + + var attestations_list = [_]Attestation{ + .{ .validator_id = 0, .data = att_data }, + .{ .validator_id = 1, .data = att_data }, + .{ .validator_id = 2, .data = att_data }, + }; + + var payloads_map = AggregatedPayloadsMap.init(allocator); + defer { + var it = payloads_map.valueIterator(); + while (it.next()) |list| { + for (list.items) |*item| { + item.proof.deinit(); + } + list.deinit(); + } + payloads_map.deinit(); + } + + var proof_a = try aggregation.AggregatedSignatureProof.init(allocator); + var proof_a_moved = false; + errdefer if (!proof_a_moved) proof_a.deinit(); + try attestation.aggregationBitsSet(&proof_a.participants, 0, true); + try attestation.aggregationBitsSet(&proof_a.participants, 1, true); + try attestation.aggregationBitsSet(&proof_a.participants, 2, false); + + var proof_b = try aggregation.AggregatedSignatureProof.init(allocator); + var proof_b_moved = false; + errdefer if (!proof_b_moved) proof_b.deinit(); + try attestation.aggregationBitsSet(&proof_b.participants, 0, true); + try attestation.aggregationBitsSet(&proof_b.participants, 1, false); + try attestation.aggregationBitsSet(&proof_b.participants, 2, true); + + const gop = try payloads_map.getOrPut(.{ .validator_id = 0, .data_root = data_root }); + if (!gop.found_existing) { + gop.value_ptr.* = AggregatedPayloadsList.init(allocator); + } + try gop.value_ptr.append(.{ + .attestation_slot = att_data.slot, + .source_slot = att_data.slot, + .proof = proof_a, + }); + proof_a_moved = true; + try gop.value_ptr.append(.{ + .attestation_slot = att_data.slot, + .source_slot = att_data.slot, + .proof = proof_b, + }); + proof_b_moved = true; + + var agg_ctx = try AggregatedAttestationsResult.init(allocator); + defer agg_ctx.deinit(); + try agg_ctx.selectAggregatedProofs(attestations_list[0..], &payloads_map); + + try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestations.len()); + const selected = try agg_ctx.attestations.get(0); + try std.testing.expectEqual(@as(usize, 2), selected.aggregation_bits.len()); + try std.testing.expect(try selected.aggregation_bits.get(0)); + try std.testing.expect(try selected.aggregation_bits.get(1)); +} diff --git a/pkgs/types/src/block_signatures_testing.zig b/pkgs/types/src/block_signatures_testing.zig index 7cb4758f..e5c559d6 100644 --- a/pkgs/types/src/block_signatures_testing.zig +++ b/pkgs/types/src/block_signatures_testing.zig @@ -25,7 +25,7 @@ const AggregatedAttestationsResult = block.AggregatedAttestationsResult; const AggregatedPayloadsList = block.AggregatedPayloadsList; // ============================================================================ -// Test helpers for computeAggregatedSignatures +// Test helpers for aggregation helpers // ============================================================================ const keymanager = @import("@zeam/key-manager"); @@ -194,7 +194,8 @@ const TestContext = struct { gop.value_ptr.* = AggregatedPayloadsList.init(self.allocator); } try gop.value_ptr.append(.{ - .slot = self.attestation_data.slot, + .attestation_slot = self.attestation_data.slot, + .source_slot = self.attestation_data.slot, .proof = proof, }); } @@ -235,15 +236,14 @@ fn deinitPayloadsMap(map: *AggregatedPayloadsMap) void { } // ============================================================================ -// Test 1: All 4 signatures in signatures_map (pure signatures_map) +// aggregateGossipSignatures tests // ============================================================================ -test "computeAggregatedSignatures: all 4 in signatures_map" { +test "aggregateGossipSignatures: all 4 in signatures_map" { const allocator = std.testing.allocator; var ctx = try TestContext.init(allocator, 4); defer ctx.deinit(); - // Create attestations for all 4 validators var attestations_list = [_]attestation.Attestation{ ctx.createAttestation(0), ctx.createAttestation(1), @@ -251,7 +251,6 @@ test "computeAggregatedSignatures: all 4 in signatures_map" { ctx.createAttestation(3), }; - // Add all 4 signatures to signatures_map var signatures_map = SignaturesMap.init(allocator); defer deinitSignaturesMap(&signatures_map); @@ -260,22 +259,15 @@ test "computeAggregatedSignatures: all 4 in signatures_map" { try ctx.addToSignatureMap(&signatures_map, 2); try ctx.addToSignatureMap(&signatures_map, 3); - // No aggregated payloads - var payloads_map = AggregatedPayloadsMap.init(allocator); - defer deinitPayloadsMap(&payloads_map); - - // Create aggregation context and compute var agg_ctx = try AggregatedAttestationsResult.init(allocator); defer agg_ctx.deinit(); - try agg_ctx.computeAggregatedSignatures( - &attestations_list, + try agg_ctx.aggregateGossipSignatures( + attestations_list[0..], &ctx.validators, &signatures_map, - &payloads_map, ); - // Should have exactly 1 aggregated attestation covering all 4 validators try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestations.len()); try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestation_signatures.len()); @@ -284,345 +276,52 @@ test "computeAggregatedSignatures: all 4 in signatures_map" { } // ============================================================================ -// Test 2: 2 in signatures_map, 2 in aggregated_proof (clean split) -// ============================================================================ -test "computeAggregatedSignatures: 2 signatures_map, 2 in aggregated proof" { - const allocator = std.testing.allocator; - - var ctx = try TestContext.init(allocator, 4); - defer ctx.deinit(); - - // Create attestations for all 4 validators - var attestations_list = [_]attestation.Attestation{ - ctx.createAttestation(0), - ctx.createAttestation(1), - ctx.createAttestation(2), - ctx.createAttestation(3), - }; - - // Add signatures for validators 0, 1 only - var signatures_map = SignaturesMap.init(allocator); - defer deinitSignaturesMap(&signatures_map); - - try ctx.addToSignatureMap(&signatures_map, 0); - try ctx.addToSignatureMap(&signatures_map, 1); - - // Create aggregated proof for validators 2, 3 - var payloads_map = AggregatedPayloadsMap.init(allocator); - defer deinitPayloadsMap(&payloads_map); - - const proof_2_3 = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 2, 3 }); - // Add to both validator 2 and 3's lookup - try ctx.addAggregatedPayload(&payloads_map, 2, proof_2_3); - - // Create aggregation context and compute - var agg_ctx = try AggregatedAttestationsResult.init(allocator); - defer agg_ctx.deinit(); - - try agg_ctx.computeAggregatedSignatures( - &attestations_list, - &ctx.validators, - &signatures_map, - &payloads_map, - ); - - // Should have exactly 2 aggregated attestations - try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestations.len()); - try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestation_signatures.len()); - - // Verify one covers 2,3 and one covers 0,1 - var found_0_1 = false; - var found_2_3 = false; - - for (0..agg_ctx.attestations.len()) |i| { - const att_bits = &(try agg_ctx.attestations.get(i)).aggregation_bits; - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 0, 1 })) { - found_0_1 = true; - } - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 2, 3 })) { - found_2_3 = true; - } - } - - try std.testing.expect(found_0_1); - try std.testing.expect(found_2_3); -} - -// ============================================================================ -// Test 3: 2 in signatures_map, all 4 in aggregated_proof (full overlap - no redundancy) -// When stored proof covers ALL validators, signatures_map aggregation is skipped +// aggregateGossipSignatures: missing signatures excluded // ============================================================================ -test "computeAggregatedSignatures: full overlap uses stored only" { +test "aggregateGossipSignatures: missing signatures excluded" { const allocator = std.testing.allocator; - var ctx = try TestContext.init(allocator, 4); + var ctx = try TestContext.init(allocator, 5); defer ctx.deinit(); - // Create attestations for all 4 validators var attestations_list = [_]attestation.Attestation{ - ctx.createAttestation(0), ctx.createAttestation(1), ctx.createAttestation(2), ctx.createAttestation(3), + ctx.createAttestation(4), }; - // Add signatures for validators 0, 1 only var signatures_map = SignaturesMap.init(allocator); defer deinitSignaturesMap(&signatures_map); - try ctx.addToSignatureMap(&signatures_map, 0); try ctx.addToSignatureMap(&signatures_map, 1); + try ctx.addToSignatureMap(&signatures_map, 3); - // Create aggregated proof for ALL 4 validators (fully covers 0,1) - var payloads_map = AggregatedPayloadsMap.init(allocator); - defer deinitPayloadsMap(&payloads_map); - - const proof_all = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 0, 1, 2, 3 }); - try ctx.addAggregatedPayload(&payloads_map, 2, proof_all); - - // Create aggregation context and compute var agg_ctx = try AggregatedAttestationsResult.init(allocator); defer agg_ctx.deinit(); - try agg_ctx.computeAggregatedSignatures( - &attestations_list, + try agg_ctx.aggregateGossipSignatures( + attestations_list[0..], &ctx.validators, &signatures_map, - &payloads_map, ); - // Should have only 1 aggregated attestation: - // - Stored proof covering {0,1,2,3} - // - signatures_map {0,1} is NOT included because all validators are covered by stored proof try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestations.len()); try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestation_signatures.len()); const att_bits = &(try agg_ctx.attestations.get(0)).aggregation_bits; - try std.testing.expect(try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 0, 1, 2, 3 })); + try std.testing.expect(try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 1, 3 })); } // ============================================================================ -// Test 4: Greedy set-cover with competing proofs +// aggregateGossipSignatures: multiple data roots // ============================================================================ -test "computeAggregatedSignatures: greedy set-cover" { +test "aggregateGossipSignatures: multiple data roots" { const allocator = std.testing.allocator; var ctx = try TestContext.init(allocator, 4); defer ctx.deinit(); - // Create attestations for all 4 validators - var attestations_list = [_]attestation.Attestation{ - ctx.createAttestation(0), - ctx.createAttestation(1), - ctx.createAttestation(2), - ctx.createAttestation(3), - }; - - // Add signature only for validator 0 - var signatures_map = SignaturesMap.init(allocator); - defer deinitSignaturesMap(&signatures_map); - - try ctx.addToSignatureMap(&signatures_map, 0); - - // Create competing aggregated proofs: - // Proof A: covers 1,2,3 (optimal) - // Proof B: covers 1,2 (suboptimal) - // Proof C: covers 2,3 (suboptimal) - var payloads_map = AggregatedPayloadsMap.init(allocator); - defer deinitPayloadsMap(&payloads_map); - - const proof_a = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 1, 2, 3 }); - const proof_b = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 1, 2 }); - - // Add proof A and B for validator 1 lookup - try ctx.addAggregatedPayload(&payloads_map, 1, proof_a); - try ctx.addAggregatedPayload(&payloads_map, 1, proof_b); - - // Create aggregation context and compute - var agg_ctx = try AggregatedAttestationsResult.init(allocator); - defer agg_ctx.deinit(); - - try agg_ctx.computeAggregatedSignatures( - &attestations_list, - &ctx.validators, - &signatures_map, - &payloads_map, - ); - - // Should have exactly 2 aggregated attestations: - // 1. signatures_map for validator 0 - // 2. Aggregated proof A for validators 1,2,3 - try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestations.len()); - try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestation_signatures.len()); - - // Verify one covers 0 and one covers 1,2,3 - var found_0 = false; - var found_1_2_3 = false; - - for (0..agg_ctx.attestations.len()) |i| { - const att_bits = &(try agg_ctx.attestations.get(i)).aggregation_bits; - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{0})) { - found_0 = true; - } - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 1, 2, 3 })) { - found_1_2_3 = true; - } - } - - try std.testing.expect(found_0); - try std.testing.expect(found_1_2_3); -} - -// ============================================================================ -// Test 5: Partial signatures_map overlap with stored proof (maximize coverage) -// signatures_map {1,2} + Stored {2,3,4} = Both included for maximum coverage {1,2,3,4} -// ============================================================================ -test "computeAggregatedSignatures: partial signatures_map overlap maximizes coverage" { - const allocator = std.testing.allocator; - - var ctx = try TestContext.init(allocator, 5); - defer ctx.deinit(); - - // Create attestations for validators 1,2,3,4 - var attestations_list = [_]attestation.Attestation{ - ctx.createAttestation(1), - ctx.createAttestation(2), - ctx.createAttestation(3), - ctx.createAttestation(4), - }; - - // Add signatures_map for validators 1, 2 only - var signatures_map = SignaturesMap.init(allocator); - defer deinitSignaturesMap(&signatures_map); - - try ctx.addToSignatureMap(&signatures_map, 1); - try ctx.addToSignatureMap(&signatures_map, 2); - - // Create aggregated proof for validators 2, 3, 4 (overlaps with signatures_map on 2) - var payloads_map = AggregatedPayloadsMap.init(allocator); - defer deinitPayloadsMap(&payloads_map); - - const proof_2_3_4 = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 2, 3, 4 }); - try ctx.addAggregatedPayload(&payloads_map, 3, proof_2_3_4); - - // Create aggregation context and compute - var agg_ctx = try AggregatedAttestationsResult.init(allocator); - defer agg_ctx.deinit(); - - try agg_ctx.computeAggregatedSignatures( - &attestations_list, - &ctx.validators, - &signatures_map, - &payloads_map, - ); - - // Should have 2 aggregated attestations: - // 1. Stored proof covering {2,3,4} - // 2. signatures_map aggregation covering {1} only (validator 2 excluded - already in stored proof) - // Together they cover {1,2,3,4} without redundancy - try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestations.len()); - try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestation_signatures.len()); - - // Verify both aggregations exist - var found_1 = false; - var found_2_3_4 = false; - - for (0..agg_ctx.attestations.len()) |i| { - const att_bits = &(try agg_ctx.attestations.get(i)).aggregation_bits; - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{1})) { - found_1 = true; - } - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 2, 3, 4 })) { - found_2_3_4 = true; - } - } - - try std.testing.expect(found_1); - try std.testing.expect(found_2_3_4); -} - -// ============================================================================ -// Test 6: Empty attestations list -// ============================================================================ -test "computeAggregatedSignatures: empty attestations" { - const allocator = std.testing.allocator; - - var ctx = try TestContext.init(allocator, 4); - defer ctx.deinit(); - - var attestations_list = [_]attestation.Attestation{}; - - var signatures_map = SignaturesMap.init(allocator); - defer deinitSignaturesMap(&signatures_map); - - var payloads_map = AggregatedPayloadsMap.init(allocator); - defer deinitPayloadsMap(&payloads_map); - - var agg_ctx = try AggregatedAttestationsResult.init(allocator); - defer agg_ctx.deinit(); - - try agg_ctx.computeAggregatedSignatures( - &attestations_list, - &ctx.validators, - &signatures_map, - &payloads_map, - ); - - // Should have no attestations - try std.testing.expectEqual(@as(usize, 0), agg_ctx.attestations.len()); - try std.testing.expectEqual(@as(usize, 0), agg_ctx.attestation_signatures.len()); -} - -// ============================================================================ -// Test 7: No signatures available -// ============================================================================ -test "computeAggregatedSignatures: no signatures available" { - const allocator = std.testing.allocator; - - var ctx = try TestContext.init(allocator, 4); - defer ctx.deinit(); - - // Create attestations for all 4 validators - var attestations_list = [_]attestation.Attestation{ - ctx.createAttestation(0), - ctx.createAttestation(1), - ctx.createAttestation(2), - ctx.createAttestation(3), - }; - - // No signatures_map signatures - var signatures_map = SignaturesMap.init(allocator); - defer deinitSignaturesMap(&signatures_map); - - // No aggregated payloads - var payloads_map = AggregatedPayloadsMap.init(allocator); - defer deinitPayloadsMap(&payloads_map); - - var agg_ctx = try AggregatedAttestationsResult.init(allocator); - defer agg_ctx.deinit(); - - try agg_ctx.computeAggregatedSignatures( - &attestations_list, - &ctx.validators, - &signatures_map, - &payloads_map, - ); - - // Should have no attestations (all validators uncovered) - try std.testing.expectEqual(@as(usize, 0), agg_ctx.attestations.len()); - try std.testing.expectEqual(@as(usize, 0), agg_ctx.attestation_signatures.len()); -} - -// ============================================================================ -// Test 8: Multiple data roots (separate groups) -// ============================================================================ -test "computeAggregatedSignatures: multiple data roots" { - const allocator = std.testing.allocator; - - var ctx = try TestContext.init(allocator, 4); - defer ctx.deinit(); - - // Create second attestation data with different slot const att_data_2 = attestation.AttestationData{ .slot = 10, .head = .{ .root = [_]u8{2} ** 32, .slot = 10 }, @@ -631,23 +330,19 @@ test "computeAggregatedSignatures: multiple data roots" { }; const data_root_2 = try att_data_2.sszRoot(allocator); - // Create attestations: 0,1 with data_root_1, 2,3 with data_root_2 var attestations_list = [_]attestation.Attestation{ - ctx.createAttestation(0), // data_root_1 - ctx.createAttestation(1), // data_root_1 - ctx.createAttestationWithData(2, att_data_2), // data_root_2 - ctx.createAttestationWithData(3, att_data_2), // data_root_2 + ctx.createAttestation(0), + ctx.createAttestation(1), + ctx.createAttestationWithData(2, att_data_2), + ctx.createAttestationWithData(3, att_data_2), }; - // Add signatures_map signatures for all var signatures_map = SignaturesMap.init(allocator); defer deinitSignaturesMap(&signatures_map); - // Signatures for group 1 (data_root_1) try ctx.addToSignatureMap(&signatures_map, 0); try ctx.addToSignatureMap(&signatures_map, 1); - // Signatures for group 2 (data_root_2) - need to sign with different data const att_2 = attestations_list[2]; const sig_bytes_2 = try ctx.key_manager.signAttestation(&att_2, allocator); try signatures_map.put( @@ -662,24 +357,18 @@ test "computeAggregatedSignatures: multiple data roots" { .{ .slot = att_data_2.slot, .signature = sig_bytes_3 }, ); - var payloads_map = AggregatedPayloadsMap.init(allocator); - defer deinitPayloadsMap(&payloads_map); - var agg_ctx = try AggregatedAttestationsResult.init(allocator); defer agg_ctx.deinit(); - try agg_ctx.computeAggregatedSignatures( - &attestations_list, + try agg_ctx.aggregateGossipSignatures( + attestations_list[0..], &ctx.validators, &signatures_map, - &payloads_map, ); - // Should have exactly 2 aggregated attestations (one per data root) try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestations.len()); try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestation_signatures.len()); - // Verify one covers 0,1 and one covers 2,3 var found_0_1 = false; var found_2_3 = false; @@ -698,62 +387,90 @@ test "computeAggregatedSignatures: multiple data roots" { } // ============================================================================ -// Test 9: Single validator attestation +// selectAggregatedProofs tests // ============================================================================ -test "computeAggregatedSignatures: single validator" { +test "selectAggregatedProofs: greedy set-cover" { const allocator = std.testing.allocator; - var ctx = try TestContext.init(allocator, 1); + var ctx = try TestContext.init(allocator, 4); defer ctx.deinit(); - // Create attestation for single validator var attestations_list = [_]attestation.Attestation{ - ctx.createAttestation(0), + ctx.createAttestation(1), + ctx.createAttestation(2), + ctx.createAttestation(3), }; - // Add signatures_map signature - var signatures_map = SignaturesMap.init(allocator); - defer deinitSignaturesMap(&signatures_map); + var payloads_map = AggregatedPayloadsMap.init(allocator); + defer deinitPayloadsMap(&payloads_map); - try ctx.addToSignatureMap(&signatures_map, 0); + const proof_a = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 1, 2, 3 }); + const proof_b = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 1, 2 }); + + try ctx.addAggregatedPayload(&payloads_map, 1, proof_a); + try ctx.addAggregatedPayload(&payloads_map, 1, proof_b); + + var agg_ctx = try AggregatedAttestationsResult.init(allocator); + defer agg_ctx.deinit(); + + try agg_ctx.selectAggregatedProofs( + attestations_list[0..], + &payloads_map, + ); + + try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestations.len()); + try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestation_signatures.len()); + + const att_bits = &(try agg_ctx.attestations.get(0)).aggregation_bits; + try std.testing.expect(try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 1, 2, 3 })); +} + +// ============================================================================ +// selectAggregatedProofs: partial coverage excludes missing +// ============================================================================ +test "selectAggregatedProofs: partial coverage excludes missing" { + const allocator = std.testing.allocator; + + var ctx = try TestContext.init(allocator, 5); + defer ctx.deinit(); + + var attestations_list = [_]attestation.Attestation{ + ctx.createAttestation(1), + ctx.createAttestation(2), + ctx.createAttestation(3), + ctx.createAttestation(4), + }; var payloads_map = AggregatedPayloadsMap.init(allocator); defer deinitPayloadsMap(&payloads_map); + const proof_2_3 = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 2, 3 }); + try ctx.addAggregatedPayload(&payloads_map, 2, proof_2_3); + var agg_ctx = try AggregatedAttestationsResult.init(allocator); defer agg_ctx.deinit(); - try agg_ctx.computeAggregatedSignatures( - &attestations_list, - &ctx.validators, - &signatures_map, + try agg_ctx.selectAggregatedProofs( + attestations_list[0..], &payloads_map, ); - // Should have exactly 1 aggregated attestation with 1 validator try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestations.len()); try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestation_signatures.len()); const att_bits = &(try agg_ctx.attestations.get(0)).aggregation_bits; - try std.testing.expect(try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{0})); + try std.testing.expect(try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 2, 3 })); } // ============================================================================ -// Test 10: Complex scenario with 3 attestation_data types -// - Group 1: All validators have signatures_map signatures (pure signatures_map) -// - Group 2: All validators covered by aggregated_payload only (pure stored) -// - Group 3: Overlap - some signatures_map + stored proof covering some signatures_map validators +// selectAggregatedProofs: multiple data roots // ============================================================================ -test "computeAggregatedSignatures: complex 3 groups" { +test "selectAggregatedProofs: multiple data roots" { const allocator = std.testing.allocator; - // Need 10 validators for this test - var ctx = try TestContext.init(allocator, 10); + var ctx = try TestContext.init(allocator, 4); defer ctx.deinit(); - // Create 3 different attestation data types - const att_data_1 = ctx.attestation_data; // slot 5 (uses ctx.data_root for signatures_map) - const att_data_2 = attestation.AttestationData{ .slot = 10, .head = .{ .root = [_]u8{2} ** 32, .slot = 10 }, @@ -762,65 +479,20 @@ test "computeAggregatedSignatures: complex 3 groups" { }; const data_root_2 = try att_data_2.sszRoot(allocator); - const att_data_3 = attestation.AttestationData{ - .slot = 15, - .head = .{ .root = [_]u8{3} ** 32, .slot = 15 }, - .target = .{ .root = [_]u8{3} ** 32, .slot = 15 }, - .source = .{ .root = ZERO_HASH, .slot = 0 }, - }; - const data_root_3 = try att_data_3.sszRoot(allocator); - - // Create attestations for all groups: - // Group 1 (data_root_1): validators 0,1,2 - pure signatures_map - // Group 2 (data_root_2): validators 3,4,5 - pure stored - // Group 3 (data_root_3): validators 6,7,8,9 - overlap (signatures_map 6,7 + stored 7,8,9) var attestations_list = [_]attestation.Attestation{ - // Group 1 - ctx.createAttestationWithData(0, att_data_1), - ctx.createAttestationWithData(1, att_data_1), - ctx.createAttestationWithData(2, att_data_1), - // Group 2 + ctx.createAttestation(0), + ctx.createAttestation(1), + ctx.createAttestationWithData(2, att_data_2), ctx.createAttestationWithData(3, att_data_2), - ctx.createAttestationWithData(4, att_data_2), - ctx.createAttestationWithData(5, att_data_2), - // Group 3 - ctx.createAttestationWithData(6, att_data_3), - ctx.createAttestationWithData(7, att_data_3), - ctx.createAttestationWithData(8, att_data_3), - ctx.createAttestationWithData(9, att_data_3), }; - var signatures_map = SignaturesMap.init(allocator); - defer deinitSignaturesMap(&signatures_map); - - // Group 1: Add signatures_map signatures for validators 0,1,2 - try ctx.addToSignatureMap(&signatures_map, 0); - try ctx.addToSignatureMap(&signatures_map, 1); - try ctx.addToSignatureMap(&signatures_map, 2); - - // Group 2: No signatures_map signatures (all from stored) - - // Group 3: Add signatures_map signatures for validators 6,7 only - const att_6 = attestations_list[6]; - const sig_bytes_6 = try ctx.key_manager.signAttestation(&att_6, allocator); - try signatures_map.put( - .{ .validator_id = 6, .data_root = data_root_3 }, - .{ .slot = att_data_3.slot, .signature = sig_bytes_6 }, - ); - - const att_7 = attestations_list[7]; - const sig_bytes_7 = try ctx.key_manager.signAttestation(&att_7, allocator); - try signatures_map.put( - .{ .validator_id = 7, .data_root = data_root_3 }, - .{ .slot = att_data_3.slot, .signature = sig_bytes_7 }, - ); - var payloads_map = AggregatedPayloadsMap.init(allocator); defer deinitPayloadsMap(&payloads_map); - // Group 2: Create aggregated proof for validators 3,4,5 + const proof_0_1 = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 0, 1 }); + try ctx.addAggregatedPayload(&payloads_map, 0, proof_0_1); + { - // Need to create proof with att_data_2 var sigs = std.ArrayList(xmss.Signature).init(allocator); defer { for (sigs.items) |*sig| sig.deinit(); @@ -832,7 +504,7 @@ test "computeAggregatedSignatures: complex 3 groups" { pks.deinit(); } - for ([_]ValidatorIndex{ 3, 4, 5 }) |vid| { + for ([_]ValidatorIndex{ 2, 3 }) |vid| { const att = attestations_list[vid]; const sig_bytes = try ctx.key_manager.signAttestation(&att, allocator); var sig = try xmss.Signature.fromBytes(&sig_bytes); @@ -844,9 +516,9 @@ test "computeAggregatedSignatures: complex 3 groups" { try pks.append(pk); } - var pk_handles = try allocator.alloc(*const xmss.HashSigPublicKey, 3); + var pk_handles = try allocator.alloc(*const xmss.HashSigPublicKey, 2); defer allocator.free(pk_handles); - var sig_handles = try allocator.alloc(*const xmss.HashSigSignature, 3); + var sig_handles = try allocator.alloc(*const xmss.HashSigSignature, 2); defer allocator.free(sig_handles); for (pks.items, 0..) |*pk, i| pk_handles[i] = pk.handle; @@ -854,7 +526,7 @@ test "computeAggregatedSignatures: complex 3 groups" { var participants = try attestation.AggregationBits.init(allocator); errdefer participants.deinit(); - for ([_]ValidatorIndex{ 3, 4, 5 }) |vid| { + for ([_]ValidatorIndex{ 2, 3 }) |vid| { try attestation.aggregationBitsSet(&participants, @intCast(vid), true); } @@ -873,8 +545,7 @@ test "computeAggregatedSignatures: complex 3 groups" { &proof, ); - // Add to payloads_map for validator 3 - const key = SignatureKey{ .validator_id = 3, .data_root = data_root_2 }; + const key = SignatureKey{ .validator_id = 2, .data_root = data_root_2 }; const gop = try payloads_map.getOrPut(key); if (!gop.found_existing) { gop.value_ptr.* = AggregatedPayloadsList.init(allocator); @@ -882,237 +553,94 @@ test "computeAggregatedSignatures: complex 3 groups" { try gop.value_ptr.append(.{ .slot = att_data_2.slot, .proof = proof }); } - // Group 3: Create aggregated proof for validators 7,8,9 (overlaps with signatures_map on 7) - { - var sigs = std.ArrayList(xmss.Signature).init(allocator); - defer { - for (sigs.items) |*sig| sig.deinit(); - sigs.deinit(); - } - var pks = std.ArrayList(xmss.PublicKey).init(allocator); - defer { - for (pks.items) |*pk| pk.deinit(); - pks.deinit(); - } - - for ([_]ValidatorIndex{ 7, 8, 9 }) |vid| { - const att = attestations_list[vid]; - const sig_bytes = try ctx.key_manager.signAttestation(&att, allocator); - var sig = try xmss.Signature.fromBytes(&sig_bytes); - errdefer sig.deinit(); - const val = try ctx.validators.get(@intCast(vid)); - var pk = try xmss.PublicKey.fromBytes(&val.pubkey); - errdefer pk.deinit(); - try sigs.append(sig); - try pks.append(pk); - } - - var pk_handles = try allocator.alloc(*const xmss.HashSigPublicKey, 3); - defer allocator.free(pk_handles); - var sig_handles = try allocator.alloc(*const xmss.HashSigSignature, 3); - defer allocator.free(sig_handles); - - for (pks.items, 0..) |*pk, i| pk_handles[i] = pk.handle; - for (sigs.items, 0..) |*sig, i| sig_handles[i] = sig.handle; - - var participants = try attestation.AggregationBits.init(allocator); - errdefer participants.deinit(); - for ([_]ValidatorIndex{ 7, 8, 9 }) |vid| { - try attestation.aggregationBitsSet(&participants, @intCast(vid), true); - } - - var message_hash: [32]u8 = undefined; - try zeam_utils.hashTreeRoot(attestation.AttestationData, att_data_3, &message_hash, allocator); - - var proof = try aggregation.AggregatedSignatureProof.init(allocator); - errdefer proof.deinit(); - - try aggregation.AggregatedSignatureProof.aggregate( - participants, - pk_handles, - sig_handles, - &message_hash, - att_data_3.slot, - &proof, - ); - - // Add to payloads_map for validator 8 (one of the remaining signatures_map validators) - const key = SignatureKey{ .validator_id = 8, .data_root = data_root_3 }; - const gop = try payloads_map.getOrPut(key); - if (!gop.found_existing) { - gop.value_ptr.* = AggregatedPayloadsList.init(allocator); - } - try gop.value_ptr.append(.{ .slot = att_data_3.slot, .proof = proof }); - } - - // Execute var agg_ctx = try AggregatedAttestationsResult.init(allocator); defer agg_ctx.deinit(); - try agg_ctx.computeAggregatedSignatures( - &attestations_list, - &ctx.validators, - &signatures_map, + try agg_ctx.selectAggregatedProofs( + attestations_list[0..], &payloads_map, ); - // Expected results: - // - Group 1: 1 attestation from signatures_map {0,1,2} - // - Group 2: 1 attestation from stored {3,4,5} - // - Group 3: 2 attestations - stored {7,8,9} + signatures_map {6} (7 excluded from signatures_map) - // Total: 4 attestations - try std.testing.expectEqual(@as(usize, 4), agg_ctx.attestations.len()); - try std.testing.expectEqual(@as(usize, 4), agg_ctx.attestation_signatures.len()); + try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestations.len()); + try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestation_signatures.len()); - // Verify each group - var found_0_1_2 = false; - var found_3_4_5 = false; - var found_7_8_9 = false; - var found_6 = false; + var found_0_1 = false; + var found_2_3 = false; for (0..agg_ctx.attestations.len()) |i| { const att_bits = &(try agg_ctx.attestations.get(i)).aggregation_bits; - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 0, 1, 2 })) { - found_0_1_2 = true; - } - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 3, 4, 5 })) { - found_3_4_5 = true; - } - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 7, 8, 9 })) { - found_7_8_9 = true; + if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 0, 1 })) { + found_0_1 = true; } - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{6})) { - found_6 = true; + if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 2, 3 })) { + found_2_3 = true; } } - try std.testing.expect(found_0_1_2); // Group 1: pure signatures_map - try std.testing.expect(found_3_4_5); // Group 2: pure stored - try std.testing.expect(found_7_8_9); // Group 3: stored proof - try std.testing.expect(found_6); // Group 3: remaining signatures_map (7 excluded) + try std.testing.expect(found_0_1); + try std.testing.expect(found_2_3); } // ============================================================================ -// Test 11: Validator without signature is excluded -// signatures_map {1} + aggregated_payload {2,3} = attestations {1} + {2,3}, validator 4 excluded +// selectAggregatedProofs: proof can include extra participants // ============================================================================ -test "computeAggregatedSignatures: validator without signature excluded" { +test "selectAggregatedProofs: proof can include extra participants" { const allocator = std.testing.allocator; var ctx = try TestContext.init(allocator, 5); defer ctx.deinit(); - // Create attestations for validators 1, 2, 3, 4 var attestations_list = [_]attestation.Attestation{ ctx.createAttestation(1), ctx.createAttestation(2), - ctx.createAttestation(3), - ctx.createAttestation(4), }; - // Add signature only for validator 1 to signatures_map - var signatures_map = SignaturesMap.init(allocator); - defer deinitSignaturesMap(&signatures_map); - - try ctx.addToSignatureMap(&signatures_map, 1); - - // Create aggregated proof for validators 2, 3 only var payloads_map = AggregatedPayloadsMap.init(allocator); defer deinitPayloadsMap(&payloads_map); - const proof_2_3 = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 2, 3 }); - try ctx.addAggregatedPayload(&payloads_map, 2, proof_2_3); + const proof_1_2_3_4 = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 1, 2, 3, 4 }); + try ctx.addAggregatedPayload(&payloads_map, 1, proof_1_2_3_4); - // Create aggregation context and compute var agg_ctx = try AggregatedAttestationsResult.init(allocator); defer agg_ctx.deinit(); - try agg_ctx.computeAggregatedSignatures( - &attestations_list, - &ctx.validators, - &signatures_map, + try agg_ctx.selectAggregatedProofs( + attestations_list[0..], &payloads_map, ); - // Should have exactly 2 aggregated attestations: - // 1. signatures_map for validator 1 - // 2. Aggregated proof for validators 2, 3 - // Validator 4 should be excluded (no signature available) - try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestations.len()); - try std.testing.expectEqual(@as(usize, 2), agg_ctx.attestation_signatures.len()); - - // Verify one covers {1} and one covers {2, 3} - var found_1 = false; - var found_2_3 = false; - - for (0..agg_ctx.attestations.len()) |i| { - const att_bits = &(try agg_ctx.attestations.get(i)).aggregation_bits; - - // Check for validator 1 only - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{1})) { - found_1 = true; - } - // Check for validators 2, 3 - if (try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 2, 3 })) { - found_2_3 = true; - } - - // Verify validator 4 is NOT included in any attestation - // If the bitlist has fewer than 5 elements, validator 4 can't be included - if (att_bits.len() > 4) { - try std.testing.expect(!(try att_bits.get(4))); - } - } + try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestations.len()); + try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestation_signatures.len()); - try std.testing.expect(found_1); - try std.testing.expect(found_2_3); + const att_bits = &(try agg_ctx.attestations.get(0)).aggregation_bits; + try std.testing.expect(try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 1, 2, 3, 4 })); } // ============================================================================ -// Test 12: Single attestation lookup key with all validators in aggregated payload -// Attestations for validators 1,2 nothing in signatures_map, -// aggregated_payload {1,2,3,4} indexed by validator 1 => all bits set -// Validators 3 and 4 are included although not covered by attestations_list +// selectAggregatedProofs: empty payloads yields empty // ============================================================================ -test "computeAggregatedSignatures: empty signatures_map with full aggregated payload" { +test "selectAggregatedProofs: empty payloads yields empty" { const allocator = std.testing.allocator; - var ctx = try TestContext.init(allocator, 5); + var ctx = try TestContext.init(allocator, 2); defer ctx.deinit(); - // Create attestations for validators 1, 2 var attestations_list = [_]attestation.Attestation{ + ctx.createAttestation(0), ctx.createAttestation(1), - ctx.createAttestation(2), }; - // Empty signatures_map - nothing found while iterating - var signatures_map = SignaturesMap.init(allocator); - defer deinitSignaturesMap(&signatures_map); - - // Create aggregated proof for validators 1, 2, 3, 4 indexed by validator 1 var payloads_map = AggregatedPayloadsMap.init(allocator); defer deinitPayloadsMap(&payloads_map); - const proof_1_2_3_4 = try ctx.createAggregatedProof(&[_]ValidatorIndex{ 1, 2, 3, 4 }); - try ctx.addAggregatedPayload(&payloads_map, 1, proof_1_2_3_4); - - // Create aggregation context and compute var agg_ctx = try AggregatedAttestationsResult.init(allocator); defer agg_ctx.deinit(); - try agg_ctx.computeAggregatedSignatures( - &attestations_list, - &ctx.validators, - &signatures_map, + try agg_ctx.selectAggregatedProofs( + attestations_list[0..], &payloads_map, ); - // Should have exactly 1 aggregated attestation covering all 4 validators - try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestations.len()); - try std.testing.expectEqual(@as(usize, 1), agg_ctx.attestation_signatures.len()); - - // Verify attestation_bits are set for validators 1, 2, 3, 4 - const att_bits = &(try agg_ctx.attestations.get(0)).aggregation_bits; - try std.testing.expect(try TestContext.checkParticipants(att_bits, &[_]ValidatorIndex{ 1, 2, 3, 4 })); + try std.testing.expectEqual(@as(usize, 0), agg_ctx.attestations.len()); + try std.testing.expectEqual(@as(usize, 0), agg_ctx.attestation_signatures.len()); } diff --git a/pkgs/types/src/lib.zig b/pkgs/types/src/lib.zig index 8f1dc381..b3aca6d2 100644 --- a/pkgs/types/src/lib.zig +++ b/pkgs/types/src/lib.zig @@ -6,6 +6,7 @@ pub const AggregationBits = attestation.AggregationBits; pub const AttestationData = attestation.AttestationData; pub const Attestation = attestation.Attestation; pub const SignedAttestation = attestation.SignedAttestation; +pub const SignedAggregatedAttestation = attestation.SignedAggregatedAttestation; pub const AggregatedAttestation = attestation.AggregatedAttestation; pub const aggregationBitsEnsureLength = attestation.aggregationBitsEnsureLength; pub const aggregationBitsSet = attestation.aggregationBitsSet;