diff --git a/p2p/node.go b/p2p/node.go index 725857c..fa91690 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -137,7 +137,7 @@ func (n *p2pNode) eachPublish(network *P2P, msg Message) { receivedEdges = append(receivedEdges, senderID) } - targets := msg.CustomProtocol(msg, allEdges, sentEdges, receivedEdges, network.cfg.CustomParams) + targets := msg.CustomProtocol(n.id, msg, allEdges, sentEdges, receivedEdges, network.cfg.CustomParams) for _, targetID := range *targets { for _, edge := range n.edges { diff --git a/p2p/p2p.go b/p2p/p2p.go index f355da0..2cd0778 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -19,7 +19,7 @@ type P2P struct { // GenerateP2P creates a P2P network from the given graph. // nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively. -func GenerateP2P(g *graph.Graph, nodeLatency, edgeLatency func() float64, cfg *Config) (*P2P, error) { +func GenerateP2P(g *graph.Graph, nodeLatency func(src PeerID) float64, edgeLatency func(src PeerID, dst PeerID) float64, cfg *Config) (*P2P, error) { nodes := make(map[PeerID]*p2pNode) maps := make(map[graph.NodeID]PeerID) @@ -31,7 +31,7 @@ func GenerateP2P(g *graph.Graph, nodeLatency, edgeLatency func() float64, cfg *C return nil, err } - n := newNode(PeerID(num), nodeLatency()) + n := newNode(PeerID(num), nodeLatency(PeerID(num))) n.edges = make(map[PeerID]p2pEdge) nodes[n.id] = n @@ -52,7 +52,7 @@ func GenerateP2P(g *graph.Graph, nodeLatency, edgeLatency func() float64, cfg *C edge := p2pEdge{ targetID: PeerID(j), - edgeLatency: edgeLatency(), + edgeLatency: edgeLatency(PeerID(num), PeerID(j)), } n.edges[edge.targetID] = edge @@ -74,6 +74,33 @@ func (p *P2P) SimulateP2P(ctx context.Context) { wg.Wait() } +// ExpireSimulation runs the simulation until the reachability of the specified message stabilizes or a timeout occurs. +func (p *P2P) ExpireSimulation(cancel context.CancelFunc, msg string, expirationDuration, timeoutDuration, checkInterval time.Duration) { + startTime := time.Now() + lastChangeTime := startTime + beforeRch := p.Reachability(msg) + + for { + currentRch := p.Reachability(msg) + + if currentRch > beforeRch { + beforeRch = currentRch + lastChangeTime = time.Now() + } + + if time.Since(lastChangeTime) > expirationDuration { + break + } + if time.Since(startTime) > timeoutDuration { + break + } + + time.Sleep(checkInterval) + } + + cancel() +} + // PeerIDs returns a slice of all node IDs in the network. func (p *P2P) PeerIDs() []PeerID { ids := make([]PeerID, 0, len(p.nodes)) diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 55b05ec..0859614 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -18,8 +18,8 @@ func TestGenerateNetwork(t *testing.T) { g := sg.ErdosRenyiGraph(1000, 50.000/1000, true) t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) - nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.5) } - edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.3) } + nodeLatency := func(id p2p.PeerID) float64 { return p2p.LogNormalRand(math.Log(100), 0.5) } + edgeLatency := func(id1, id2 p2p.PeerID) float64 { return p2p.LogNormalRand(math.Log(100), 0.3) } nw, err := p2p.GenerateP2P(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35}) if err != nil { @@ -87,8 +87,8 @@ func TestMetrics(t *testing.T) { g := sg.ErdosRenyiGraph(1000, 50.000/1000, true) t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) - nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.5) } - edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.3) } + nodeLatency := func(id p2p.PeerID) float64 { return p2p.LogNormalRand(math.Log(100), 0.5) } + edgeLatency := func(id1, id2 p2p.PeerID) float64 { return p2p.LogNormalRand(math.Log(100), 0.3) } nw, err := p2p.GenerateP2P(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35}) if err != nil { diff --git a/p2p/type.go b/p2p/type.go index 8751e42..fb11378 100644 --- a/p2p/type.go +++ b/p2p/type.go @@ -18,4 +18,4 @@ type Config struct { CustomParams map[string]any // parameters for custom protocols } -type CustomProtocolFunc func(msg Message, neighbours []PeerID, sentPeers []PeerID, receivedPeers []PeerID, customParams map[string]any) *[]PeerID +type CustomProtocolFunc func(id PeerID, msg Message, neighbours []PeerID, sentPeers []PeerID, receivedPeers []PeerID, customParams map[string]any) *[]PeerID