Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (n *p2pNode) eachPublish(network *P2P, msg Message) {
receivedEdges = append(receivedEdges, senderID)
}

targets := msg.CustomProtocol(msg, allEdges, sentEdges, receivedEdges, network.cfg.CustomParams)
targets := msg.CustomProtocol(n.id, msg, allEdges, sentEdges, receivedEdges, network.cfg.CustomParams)

for _, targetID := range *targets {
for _, edge := range n.edges {
Expand Down
33 changes: 30 additions & 3 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type P2P struct {

// GenerateP2P creates a P2P network from the given graph.
// nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively.
func GenerateP2P(g *graph.Graph, nodeLatency, edgeLatency func() float64, cfg *Config) (*P2P, error) {
func GenerateP2P(g *graph.Graph, nodeLatency func(src PeerID) float64, edgeLatency func(src PeerID, dst PeerID) float64, cfg *Config) (*P2P, error) {
nodes := make(map[PeerID]*p2pNode)
maps := make(map[graph.NodeID]PeerID)

Expand All @@ -31,7 +31,7 @@ func GenerateP2P(g *graph.Graph, nodeLatency, edgeLatency func() float64, cfg *C
return nil, err
}

n := newNode(PeerID(num), nodeLatency())
n := newNode(PeerID(num), nodeLatency(PeerID(num)))
n.edges = make(map[PeerID]p2pEdge)

nodes[n.id] = n
Expand All @@ -52,7 +52,7 @@ func GenerateP2P(g *graph.Graph, nodeLatency, edgeLatency func() float64, cfg *C

edge := p2pEdge{
targetID: PeerID(j),
edgeLatency: edgeLatency(),
edgeLatency: edgeLatency(PeerID(num), PeerID(j)),
}

n.edges[edge.targetID] = edge
Expand All @@ -74,6 +74,33 @@ func (p *P2P) SimulateP2P(ctx context.Context) {
wg.Wait()
}

// ExpireSimulation runs the simulation until the reachability of the specified message stabilizes or a timeout occurs.
func (p *P2P) ExpireSimulation(cancel context.CancelFunc, msg string, expirationDuration, timeoutDuration, checkInterval time.Duration) {
startTime := time.Now()
lastChangeTime := startTime
beforeRch := p.Reachability(msg)

for {
currentRch := p.Reachability(msg)

if currentRch > beforeRch {
beforeRch = currentRch
lastChangeTime = time.Now()
}

if time.Since(lastChangeTime) > expirationDuration {
break
}
if time.Since(startTime) > timeoutDuration {
break
}

time.Sleep(checkInterval)
}

cancel()
}

// PeerIDs returns a slice of all node IDs in the network.
func (p *P2P) PeerIDs() []PeerID {
ids := make([]PeerID, 0, len(p.nodes))
Expand Down
8 changes: 4 additions & 4 deletions p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func TestGenerateNetwork(t *testing.T) {
g := sg.ErdosRenyiGraph(1000, 50.000/1000, true)
t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount())

nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.5) }
edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.3) }
nodeLatency := func(id p2p.PeerID) float64 { return p2p.LogNormalRand(math.Log(100), 0.5) }
edgeLatency := func(id1, id2 p2p.PeerID) float64 { return p2p.LogNormalRand(math.Log(100), 0.3) }

nw, err := p2p.GenerateP2P(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35})
if err != nil {
Expand Down Expand Up @@ -87,8 +87,8 @@ func TestMetrics(t *testing.T) {
g := sg.ErdosRenyiGraph(1000, 50.000/1000, true)
t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount())

nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.5) }
edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.3) }
nodeLatency := func(id p2p.PeerID) float64 { return p2p.LogNormalRand(math.Log(100), 0.5) }
edgeLatency := func(id1, id2 p2p.PeerID) float64 { return p2p.LogNormalRand(math.Log(100), 0.3) }

nw, err := p2p.GenerateP2P(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion p2p/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ type Config struct {
CustomParams map[string]any // parameters for custom protocols
}

type CustomProtocolFunc func(msg Message, neighbours []PeerID, sentPeers []PeerID, receivedPeers []PeerID, customParams map[string]any) *[]PeerID
type CustomProtocolFunc func(id PeerID, msg Message, neighbours []PeerID, sentPeers []PeerID, receivedPeers []PeerID, customParams map[string]any) *[]PeerID
Loading