Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8cfeda1
WIP gossipsub controller
aarshkshah1992 Nov 3, 2025
7853cb9
changelog fragment
aarshkshah1992 Nov 3, 2025
ce3660d
finish draft
aarshkshah1992 Nov 4, 2025
e145906
bazel gazelle
aarshkshah1992 Nov 4, 2025
14158be
Merge branch 'develop' into feat/use-topic-abstraction-for-gossipsub-…
aarshkshah1992 Nov 4, 2025
ab2e836
fix test
aarshkshah1992 Nov 4, 2025
3e98537
fix test
aarshkshah1992 Nov 5, 2025
452d42b
fix test in sync
aarshkshah1992 Nov 5, 2025
af2522e
fix schedule
aarshkshah1992 Nov 5, 2025
8dfbabc
fork watcher test works
aarshkshah1992 Nov 5, 2025
9327105
more tests
aarshkshah1992 Nov 5, 2025
798376b
fix bazel
aarshkshah1992 Nov 5, 2025
a15a1ad
fix test
aarshkshah1992 Nov 5, 2025
3f73714
fix test in sync
aarshkshah1992 Nov 5, 2025
614367d
fix lint
aarshkshah1992 Nov 5, 2025
71050ab
fix bazel
aarshkshah1992 Nov 6, 2025
5161f08
fix compilation
aarshkshah1992 Nov 6, 2025
fa7596b
first draft
aarshkshah1992 Nov 12, 2025
14dca40
draft
aarshkshah1992 Nov 12, 2025
9291156
draft
aarshkshah1992 Nov 12, 2025
1397a79
changelog
aarshkshah1992 Nov 12, 2025
9b07f13
tests for the crawler
aarshkshah1992 Nov 13, 2025
61628ef
run CI
aarshkshah1992 Nov 13, 2025
63279bc
revert new line change
aarshkshah1992 Nov 13, 2025
08f038f
fix lint
aarshkshah1992 Nov 13, 2025
1ff836e
peer controller first draft
aarshkshah1992 Nov 17, 2025
4a6d88d
finish all changes
aarshkshah1992 Nov 21, 2025
672de43
finish all changes
aarshkshah1992 Nov 21, 2025
76975a1
fix build
aarshkshah1992 Nov 21, 2025
707abe6
fix mock peer manager
aarshkshah1992 Nov 21, 2025
41c9f16
fix bazel
aarshkshah1992 Nov 21, 2025
893cf60
fix tests
aarshkshah1992 Nov 21, 2025
e9dac06
finish tests
aarshkshah1992 Nov 21, 2025
86b65e0
re-run test
aarshkshah1992 Nov 21, 2025
e95676d
fix conflicts
aarshkshah1992 Nov 24, 2025
e6f3b63
fix lint
aarshkshah1992 Nov 24, 2025
10804bb
fix conflicts
aarshkshah1992 Nov 24, 2025
11c5c6f
fix dynamic families
aarshkshah1992 Nov 24, 2025
09d886c
fix test
aarshkshah1992 Nov 24, 2025
eb556ad
Merge branch 'develop' into feat/gossipsub-control-pane-peer-crawler-…
aarshkshah1992 Nov 24, 2025
affdab7
fix changelog
aarshkshah1992 Nov 24, 2025
eb93350
Merge remote-tracking branch 'origin/feat/gossipsub-control-pane-peer…
aarshkshah1992 Nov 24, 2025
2061fc8
design doc
aarshkshah1992 Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ go_library(
"fork_watcher.go",
"gossip_scoring_params.go",
"gossip_topic_mappings.go",
"gossipsub_peer_controller.go",
"gossipsub_peer_crawler.go",
"handshake.go",
"info.go",
"interfaces.go",
Expand Down Expand Up @@ -51,6 +53,7 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/gossipsubcrawler:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
Expand Down Expand Up @@ -115,6 +118,7 @@ go_library(
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_x_sync//semaphore:go_default_library",
],
)

Expand All @@ -130,6 +134,8 @@ go_test(
"fork_test.go",
"gossip_scoring_params_test.go",
"gossip_topic_mappings_test.go",
"gossipsub_peer_controller_test.go",
"gossipsub_peer_crawler_test.go",
"message_id_test.go",
"options_test.go",
"parameter_test.go",
Expand All @@ -154,9 +160,11 @@ go_test(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/gossipsubcrawler:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
"//beacon-chain/p2p/peers/scorers:go_default_library",
Expand Down Expand Up @@ -205,6 +213,7 @@ go_test(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)
69 changes: 24 additions & 45 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
if !ok {
return errors.Errorf("message of %T does not support marshaller interface", msg)
}
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest))

fullTopic := fmt.Sprintf(topic, forkDigest) + s.Encoding().ProtocolSuffix()

return s.broadcastObject(ctx, castMsg, fullTopic)
}

// BroadcastAttestation broadcasts an attestation to the p2p network, the message is assumed to be
Expand Down Expand Up @@ -106,6 +109,7 @@ func (s *Service) BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint
}

func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att, forkDigest [fieldparams.VersionLength]byte) {
topic := AttestationSubnetTopic(forkDigest, subnet)
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastAttestation")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
Expand All @@ -116,7 +120,7 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6

// Ensure we have peers with this subnet.
s.subnetLocker(subnet).RLock()
hasPeer := s.hasPeerWithSubnet(attestationToTopic(subnet, forkDigest))
hasPeer := s.hasPeerWithTopic(topic)
s.subnetLocker(subnet).RUnlock()

span.SetAttributes(
Expand All @@ -131,7 +135,7 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6
s.subnetLocker(subnet).Lock()
defer s.subnetLocker(subnet).Unlock()

if err := s.FindAndDialPeersWithSubnets(ctx, AttestationSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
if err := s.gossipsubDialer.DialPeersForTopicBlocking(ctx, topic, minimumPeersPerSubnetForBroadcast); err != nil {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kasey has some good ideas on how to do async broadcast for data column topics which are starved of peers.

return errors.Wrap(err, "find peers with subnets")
}

Expand All @@ -153,13 +157,14 @@ func (s *Service) internalBroadcastAttestation(ctx context.Context, subnet uint6
return
}

if err := s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest)); err != nil {
if err := s.broadcastObject(ctx, att, topic); err != nil {
log.WithError(err).Error("Failed to broadcast attestation")
tracing.AnnotateError(span, err)
}
}

func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage, forkDigest [fieldparams.VersionLength]byte) {
topic := SyncCommitteeSubnetTopic(forkDigest, subnet)
_, span := trace.StartSpan(ctx, "p2p.broadcastSyncCommittee")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
Expand All @@ -173,7 +178,7 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
// to ensure that we can reuse the same subnet locker.
wrappedSubIdx := subnet + syncLockerVal
s.subnetLocker(wrappedSubIdx).RLock()
hasPeer := s.hasPeerWithSubnet(syncCommitteeToTopic(subnet, forkDigest))
hasPeer := s.hasPeerWithTopic(topic)
s.subnetLocker(wrappedSubIdx).RUnlock()

span.SetAttributes(
Expand All @@ -187,7 +192,7 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
if err := s.FindAndDialPeersWithSubnets(ctx, SyncCommitteeSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
if err := s.gossipsubDialer.DialPeersForTopicBlocking(ctx, topic, minimumPeersPerSubnetForBroadcast); err != nil {
return errors.Wrap(err, "find peers with subnets")
}

Expand All @@ -205,7 +210,7 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
return
}

if err := s.broadcastObject(ctx, sMsg, syncCommitteeToTopic(subnet, forkDigest)); err != nil {
if err := s.broadcastObject(ctx, sMsg, topic); err != nil {
log.WithError(err).Error("Failed to broadcast sync committee message")
tracing.AnnotateError(span, err)
}
Expand Down Expand Up @@ -233,6 +238,7 @@ func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.
}

func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [fieldparams.VersionLength]byte) {
topic := BlobSubnetTopic(forkDigest, subnet)
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastBlob")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
Expand All @@ -243,7 +249,7 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob

wrappedSubIdx := subnet + blobSubnetLockerVal
s.subnetLocker(wrappedSubIdx).RLock()
hasPeer := s.hasPeerWithSubnet(blobSubnetToTopic(subnet, forkDigest))
hasPeer := s.hasPeerWithTopic(topic)
s.subnetLocker(wrappedSubIdx).RUnlock()

if !hasPeer {
Expand All @@ -252,7 +258,7 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()

if err := s.FindAndDialPeersWithSubnets(ctx, BlobSubnetTopicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
if err := s.gossipsubDialer.DialPeersForTopicBlocking(ctx, topic, minimumPeersPerSubnetForBroadcast); err != nil {
return errors.Wrap(err, "find peers with subnets")
}

Expand All @@ -264,7 +270,7 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
}
}

if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest)); err != nil {
if err := s.broadcastObject(ctx, blobSidecar, topic); err != nil {
log.WithError(err).Error("Failed to broadcast blob sidecar")
tracing.AnnotateError(span, err)
}
Expand Down Expand Up @@ -293,7 +299,7 @@ func (s *Service) BroadcastLightClientOptimisticUpdate(ctx context.Context, upda
}

digest := params.ForkDigest(slots.ToEpoch(update.AttestedHeader().Beacon().Slot))
if err := s.broadcastObject(ctx, update, lcOptimisticToTopic(digest)); err != nil {
if err := s.broadcastObject(ctx, update, LcOptimisticToTopic(digest)); err != nil {
log.WithError(err).Debug("Failed to broadcast light client optimistic update")
err := errors.Wrap(err, "could not publish message")
tracing.AnnotateError(span, err)
Expand Down Expand Up @@ -327,7 +333,7 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update
}

forkDigest := params.ForkDigest(slots.ToEpoch(update.AttestedHeader().Beacon().Slot))
if err := s.broadcastObject(ctx, update, lcFinalityToTopic(forkDigest)); err != nil {
if err := s.broadcastObject(ctx, update, LcFinalityToTopic(forkDigest)); err != nil {
log.WithError(err).Debug("Failed to broadcast light client finality update")
err := errors.Wrap(err, "could not publish message")
tracing.AnnotateError(span, err)
Expand Down Expand Up @@ -386,13 +392,14 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
subnet := peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)

// Build the topic corresponding to subnet column subnet and this fork digest.
topic := dataColumnSubnetToTopic(subnet, forkDigest)
topic := DataColumnSubnetTopic(forkDigest, subnet)

// Compute the wrapped subnet index.
wrappedSubIdx := subnet + dataColumnSubnetVal

// Find peers if needed.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {

if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, topic); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot find peers if needed")
return
Expand Down Expand Up @@ -487,20 +494,16 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
func (s *Service) findPeersIfNeeded(
ctx context.Context,
wrappedSubIdx uint64,
topicFormat string,
forkDigest [fieldparams.VersionLength]byte,
subnet uint64,
topic string,
) error {
// Sending a data column sidecar to only one peer is not ideal,
// but it ensures at least one peer receives it.
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()

// No peers found, attempt to find peers with this subnet.
if err := s.FindAndDialPeersWithSubnets(ctx, topicFormat, forkDigest, minimumPeersPerSubnetForBroadcast, map[uint64]bool{subnet: true}); err != nil {
if err := s.gossipsubDialer.DialPeersForTopicBlocking(ctx, topic, minimumPeersPerSubnetForBroadcast); err != nil {
return errors.Wrap(err, "find peers with subnet")
}

return nil
}

Expand All @@ -525,34 +528,10 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
iid := int64(id)
span = trace.AddMessageSendEvent(span, iid, messageLen /*uncompressed*/, messageLen /*compressed*/)
}
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
if err := s.PublishToTopic(ctx, topic, buf.Bytes()); err != nil {
err := errors.Wrap(err, "could not publish message")
tracing.AnnotateError(span, err)
return err
}
return nil
}

func attestationToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, forkDigest, subnet)
}

func syncCommitteeToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(SyncCommitteeSubnetTopicFormat, forkDigest, subnet)
}

func blobSubnetToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(BlobSubnetTopicFormat, forkDigest, subnet)
}

func lcOptimisticToTopic(forkDigest [4]byte) string {
return fmt.Sprintf(LightClientOptimisticUpdateTopicFormat, forkDigest)
}

func lcFinalityToTopic(forkDigest [4]byte) string {
return fmt.Sprintf(LightClientFinalityUpdateTopicFormat, forkDigest)
}

func dataColumnSubnetToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(DataColumnSubnetTopicFormat, forkDigest, subnet)
}
49 changes: 40 additions & 9 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/OffchainLabs/prysm/v7/testing/util"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/ethereum/go-ethereum/p2p/enode"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -226,6 +227,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
// Setup bootnode.
cfg := &Config{PingInterval: testPingInterval, DB: db}
cfg.UDPPort = uint(port)
cfg.TCPPort = uint(port)
_, pkey := createAddrAndPrivKey(t)
ipAddr := net.ParseIP("127.0.0.1")
genesisTime := time.Now()
Expand All @@ -251,8 +253,9 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {

var listeners []*listenerWrapper
var hosts []host.Host
var configs []*Config
// setup other nodes.
cfg = &Config{
baseCfg := &Config{
Discv5BootStrapAddrs: []string{bootNode.String()},
MaxPeers: 2,
PingInterval: testPingInterval,
Expand All @@ -261,11 +264,21 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
// Setup 2 different hosts
for i := uint(1); i <= 2; i++ {
h, pkey, ipAddr := createHost(t, port+i)
cfg.UDPPort = uint(port + i)
cfg.TCPPort = uint(port + i)

// Create a new config for each service to avoid shared mutations
cfg := &Config{
Discv5BootStrapAddrs: baseCfg.Discv5BootStrapAddrs,
MaxPeers: baseCfg.MaxPeers,
PingInterval: baseCfg.PingInterval,
DB: baseCfg.DB,
UDPPort: uint(port + i),
TCPPort: uint(port + i),
}

if len(listeners) > 0 {
cfg.Discv5BootStrapAddrs = append(cfg.Discv5BootStrapAddrs, listeners[len(listeners)-1].Self().String())
}

s := &Service{
cfg: cfg,
genesisTime: genesisTime,
Expand All @@ -278,18 +291,22 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
close(s.custodyInfoSet)

listener, err := s.startDiscoveryV5(ipAddr, pkey)
// Set for 2nd peer
assert.NoError(t, err, "Could not start discovery for node")

// Set listener for the service
s.dv5Listener = listener
s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0))

// Set subnet for 2nd peer
if i == 2 {
s.dv5Listener = listener
s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0))
bitV := bitfield.NewBitvector64()
bitV.SetBitAt(subnet, true)
err := s.updateSubnetRecordWithMetadata(bitV)
require.NoError(t, err)
}
assert.NoError(t, err, "Could not start discovery for node")
listeners = append(listeners, listener)
hosts = append(hosts, h)
configs = append(configs, cfg)
}
defer func() {
// Close down all peers.
Expand Down Expand Up @@ -324,7 +341,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
pubsub: ps1,
dv5Listener: listeners[0],
joinedTopics: map[string]*pubsub.Topic{},
cfg: cfg,
cfg: configs[0],
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
subnetsLock: make(map[uint64]*sync.RWMutex),
Expand All @@ -340,7 +357,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
pubsub: ps2,
dv5Listener: listeners[1],
joinedTopics: map[string]*pubsub.Topic{},
cfg: cfg,
cfg: configs[1],
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
subnetsLock: make(map[uint64]*sync.RWMutex),
Expand Down Expand Up @@ -762,6 +779,16 @@ func TestService_BroadcastDataColumn(t *testing.T) {
subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex)
topic := fmt.Sprintf(topicFormat, digest, subnet) + service.Encoding().ProtocolSuffix()

crawler, err := NewGossipsubPeerCrawler(service, listener, 1*time.Second, 1*time.Second, 10,
func(n *enode.Node) bool { return true },
service.Peers().Scorers().Score)
require.NoError(t, err)
err = crawler.Start(func(ctx context.Context, node *enode.Node) ([]string, error) {
return []string{topic}, nil
})
require.NoError(t, err)
service.gossipsubDialer = NewGossipsubPeerDialer(crawler, service.PubSub().ListPeers, service.DialPeers)

_, verifiedRoSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}})
verifiedRoSidecar := verifiedRoSidecars[0]

Expand All @@ -787,3 +814,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
require.NoError(t, service.Encoding().DecodeGossip(msg.Data, &result))
require.DeepEqual(t, &result, verifiedRoSidecar)
}

func attestationToTopic(subnet uint64, forkDigest [fieldparams.VersionLength]byte) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, forkDigest, subnet)
}
Loading
Loading