Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .gitcore/features.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@
"name": "Libp2p P2P Sync",
"phase": 4,
"crate": "synapse-infra",
"passes": false,
"tests": [],
"last_updated": null
"passes": true,
"tests": ["crates/synapse-infra/src/adapters/libp2p_sync_adapter.rs", "crates/synapse-infra/tests/sync_adapter_test.rs"],
"last_updated": "2026-01-14"
},
{
"id": "encryption",
Expand Down
124 changes: 107 additions & 17 deletions crates/synapse-infra/src/adapters/libp2p_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::StreamExt;
use libp2p::{
gossipsub::{self, IdentTopic, MessageAuthenticity, ValidationMode},
identity::Keypair,
kad::{self, store::MemoryStore},
mdns, noise,
swarm::NetworkBehaviour,
tcp, yamux, PeerId, SwarmBuilder,
Expand All @@ -31,18 +32,20 @@ use tracing::{error, info, trace, warn};
/// The topic for HoloPacket gossip.
const HOLO_TOPIC: &str = "synapse/holo/1.0";

/// Custom network behaviour that combines Gossipsub and mDNS.
/// Custom network behaviour that combines Gossipsub, mDNS and Kademlia.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "SynapseBehaviourEvent")]
struct SynapseBehaviour {
gossipsub: gossipsub::Behaviour,
mdns: mdns::tokio::Behaviour,
kad: kad::Behaviour<MemoryStore>,
}

#[derive(Debug)]
enum SynapseBehaviourEvent {
Gossipsub(gossipsub::Event),
Mdns(mdns::Event),
Kad(kad::Event),
}

impl From<gossipsub::Event> for SynapseBehaviourEvent {
Expand All @@ -57,6 +60,12 @@ impl From<mdns::Event> for SynapseBehaviourEvent {
}
}

impl From<kad::Event> for SynapseBehaviourEvent {
fn from(event: kad::Event) -> Self {
SynapseBehaviourEvent::Kad(event)
}
}


/// The Libp2p Adapter for HoloPacket communication.
pub struct Libp2pAdapter {
Expand All @@ -68,9 +77,10 @@ pub struct Libp2pAdapter {
}

/// Commands sent to the swarm loop.
#[derive(Debug)]
enum Command {
Broadcast(HoloPacket),
DhtPut(Vec<u8>, Vec<u8>, tokio::sync::oneshot::Sender<SynapseResult<()>>),
DhtGet(Vec<u8>, tokio::sync::oneshot::Sender<SynapseResult<Option<Vec<u8>>>>),
}

impl Libp2pAdapter {
Expand Down Expand Up @@ -118,7 +128,9 @@ impl Libp2pAdapter {
)
.map_err(|e| Error::System(format!("Gossipsub behaviour error: {}", e)))?;

let behaviour = SynapseBehaviour { gossipsub, mdns };
let kad = kad::Behaviour::new(local_peer_id, MemoryStore::new(local_peer_id));

let behaviour = SynapseBehaviour { gossipsub, mdns, kad };
SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
Expand Down Expand Up @@ -154,34 +166,63 @@ impl Libp2pAdapter {

let peers_clone = peers.clone();
let listening_addresses_clone = listening_addresses.clone();
let mut pending_gets = std::collections::HashMap::new();
let mut pending_puts = std::collections::HashMap::new();

tokio::spawn(async move {
loop {
tokio::select! {
Some(Command::Broadcast(packet)) = command_receiver.recv() => {
match serde_json::to_vec(&packet) {
Ok(json) => {
if let Err(e) = swarm.behaviour_mut().gossipsub.publish(holo_topic.clone(), json) {
error!("Failed to publish HoloPacket: {:?}", e);
Some(cmd) = command_receiver.recv() => {
match cmd {
Command::Broadcast(packet) => {
match serde_json::to_vec(&packet) {
Ok(json) => {
if let Err(e) = swarm.behaviour_mut().gossipsub.publish(holo_topic.clone(), json) {
error!("Failed to publish HoloPacket: {:?}", e);
}
}
Err(e) => error!("Failed to serialize HoloPacket: {}", e),
}
}
Err(e) => error!("Failed to serialize HoloPacket: {}", e),
Command::DhtPut(key, value, sender) => {
let record = kad::Record {
key: kad::RecordKey::new(&key),
value,
publisher: None,
expires: None,
};
match swarm.behaviour_mut().kad.put_record(record, kad::Quorum::One) {
Ok(query_id) => {
pending_puts.insert(query_id, sender);
}
Err(e) => {
let _ = sender.send(Err(Error::System(format!("DHT put error: {:?}", e))));
}
}
}
Command::DhtGet(key, sender) => {
let query_id = swarm.behaviour_mut().kad.get_record(kad::RecordKey::new(&key));
pending_gets.insert(query_id, sender);
}
}
}
event = swarm.select_next_some() => {
match event {
libp2p::swarm::SwarmEvent::Behaviour(event) => match event {
SynapseBehaviourEvent::Mdns(mdns::Event::Discovered(list)) => {
for (peer_id, _multiaddr) in list {
trace!("mDNS discovered: {}", peer_id);
for (peer_id, multiaddr) in list {
trace!("mDNS discovered: {} at {}", peer_id, multiaddr);
peers_clone.write().unwrap().insert(peer_id);
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
swarm.behaviour_mut().kad.add_address(&peer_id, multiaddr);
}
}
SynapseBehaviourEvent::Mdns(mdns::Event::Expired(list)) => {
for (peer_id, _multiaddr) in list {
trace!("mDNS expired: {}", peer_id);
for (peer_id, multiaddr) in list {
trace!("mDNS expired: {} at {}", peer_id, multiaddr);
peers_clone.write().unwrap().remove(&peer_id);
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
swarm.behaviour_mut().kad.remove_address(&peer_id, &multiaddr);
}
}
SynapseBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }) => {
Expand All @@ -208,6 +249,45 @@ impl Libp2pAdapter {
Err(e) => warn!("Failed to deserialize HoloPacket: {}", e),
}
}
SynapseBehaviourEvent::Kad(kad::Event::OutboundQueryProgressed { id, result, .. }) => {
match result {
kad::QueryResult::GetRecord(Ok(ok)) => {
if let Some(sender) = pending_gets.remove(&id) {
match ok {
kad::GetRecordOk::FoundRecord(peer_record) => {
let _ = sender.send(Ok(Some(peer_record.record.value)));
}
_ => {
let _ = sender.send(Ok(None));
}
}
}
}
kad::QueryResult::GetRecord(Err(e)) => {
if let Some(sender) = pending_gets.remove(&id) {
match e {
kad::GetRecordError::NotFound { .. } => {
let _ = sender.send(Ok(None));
}
_ => {
let _ = sender.send(Err(Error::System(format!("DHT get error: {:?}", e))));
}
}
}
}
kad::QueryResult::PutRecord(Ok(_)) => {
if let Some(sender) = pending_puts.remove(&id) {
let _ = sender.send(Ok(()));
}
}
kad::QueryResult::PutRecord(Err(e)) => {
if let Some(sender) = pending_puts.remove(&id) {
let _ = sender.send(Err(Error::System(format!("DHT put error: {:?}", e))));
}
}
_ => {}
}
Comment on lines +253 to +289

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for handling Kademlia query results is quite nested and repeats the if let Some(sender) = ... check. This can be refactored to be more linear and easier to read by handling GetRecord and PutRecord results in a more consolidated way. This improves readability and maintainability.

                                    match result {
                                        kad::QueryResult::GetRecord(result) => {
                                            if let Some(sender) = pending_gets.remove(&id) {
                                                let response = match result {
                                                    Ok(kad::GetRecordOk::FoundRecord(peer_record)) => Ok(Some(peer_record.record.value)),
                                                    Ok(_) => Ok(None),
                                                    Err(kad::GetRecordError::NotFound { .. }) => Ok(None),
                                                    Err(e) => Err(Error::System(format!("DHT get error: {:?}", e))),
                                                };
                                                let _ = sender.send(response);
                                            }
                                        }
                                        kad::QueryResult::PutRecord(result) => {
                                            if let Some(sender) = pending_puts.remove(&id) {
                                                let response = result.map(|_| ()).map_err(|e| Error::System(format!("DHT put error: {:?}", e)));
                                                let _ = sender.send(response);
                                            }
                                        }
                                        _ => {}
                                    }

}
_ => {}
},
libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
Expand Down Expand Up @@ -273,12 +353,22 @@ impl NetworkPort for Libp2pAdapter {
Ok(self.local_peer_id.to_string())
}

async fn dht_put(&self, _key: Vec<u8>, _value: Vec<u8>) -> SynapseResult<()> {
Err(Error::System("DHT not implemented for legacy Libp2pAdapter".into()))
async fn dht_put(&self, key: Vec<u8>, value: Vec<u8>) -> SynapseResult<()> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.command_sender
.send(Command::DhtPut(key, value, tx))
.await
.map_err(|e| Error::System(format!("Failed to send DHT put command: {}", e)))?;
rx.await.map_err(|e| Error::System(format!("DHT put response channel error: {}", e)))?
}

async fn dht_get(&self, _key: Vec<u8>) -> SynapseResult<Option<Vec<u8>>> {
Err(Error::System("DHT not implemented for legacy Libp2pAdapter".into()))
async fn dht_get(&self, key: Vec<u8>) -> SynapseResult<Option<Vec<u8>>> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.command_sender
.send(Command::DhtGet(key, tx))
.await
.map_err(|e| Error::System(format!("Failed to send DHT get command: {}", e)))?;
rx.await.map_err(|e| Error::System(format!("DHT get response channel error: {}", e)))?
}

async fn listening_addresses(&self) -> SynapseResult<Vec<String>> {
Expand Down
87 changes: 83 additions & 4 deletions crates/synapse-infra/src/adapters/libp2p_sync_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,115 @@
use std::sync::Arc;
use synapse_core::{
error::{Error, Result},
ports::{network_port::NetworkPort, peer_port::PeerStoragePort},
ports::{network_port::NetworkPort, peer_port::PeerStoragePort, memory_port::MemoryPort},
};
use tracing::{info, warn};

/// The prefix for peer records in the DHT.
use tracing::{info, warn, error};
use tokio::time::{self, Duration};
use synapse_core::ports::peer_port::PeerInfo;

const PEER_DHT_PREFIX: &str = "synapse/peer/";
const MEMORY_SYNC_PREFIX: &str = "synapse/memory/sync/";

fn dht_key(peer_id: &str) -> Vec<u8> {
format!("{}{}", PEER_DHT_PREFIX, peer_id).into_bytes()
}

fn memory_sync_key(peer_id: &str) -> Vec<u8> {
format!("{}{}", MEMORY_SYNC_PREFIX, peer_id).into_bytes()
}

pub struct Libp2pSyncAdapter {
network_port: Arc<dyn NetworkPort>,
peer_storage: Arc<dyn PeerStoragePort>,
memory_storage: Arc<dyn MemoryPort>,
shutdown_tx: tokio::sync::watch::Sender<bool>,
}

impl Libp2pSyncAdapter {
/// Creates a new `Libp2pSyncAdapter`.
pub fn new(
network_port: Arc<dyn NetworkPort>,
peer_storage: Arc<dyn PeerStoragePort>,
memory_storage: Arc<dyn MemoryPort>,
) -> Self {
let (shutdown_tx, _) = tokio::sync::watch::channel(false);
Self {
network_port,
peer_storage,
memory_storage,
shutdown_tx,
}
}

/// Starts the periodic synchronization process in a background task.
pub fn start(self: Arc<Self>) {
let adapter = self.clone();
let mut shutdown_rx = self.shutdown_tx.subscribe();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(60));
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = adapter.sync_peers().await {
error!("Peer sync failed: {}", e);
}
if let Err(e) = adapter.sync_memories().await {
error!("Memory sync failed: {}", e);
}
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
info!("Shutting down sync adapter background task");
break;
}
}
}
}
});
}

pub fn shutdown(&self) {
let _ = self.shutdown_tx.send(true);
}
Comment on lines +49 to +77
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find all call sites for Libp2pSyncAdapter creation, start, and shutdown
rg -n -C3 'Libp2pSyncAdapter::new|\.start\(|\.shutdown\(' crates

Repository: iberi22/synapse-protocol

Length of output: 3726


🌐 Web query:

tokio sync watch receiver semantics: what happens when a receiver subscribes after sender has already sent a value

💡 Result:

In tokio::sync::watch, a receiver that subscribes after a value has already been sent will:

  • Immediately see the latest value when it calls rx.borrow() / rx.borrow_and_update() (watch only retains the most recent value). (docs.rs)
  • Consider that current value “seen” at subscribe time, meaning rx.changed().await will not complete for that already-sent value; it will wait for the next send after the subscription. (docs.rs)

So: late subscribers don’t get a “change notification” for the current value, but they can still read it immediately. (docs.rs)

Citations:


🏁 Script executed:

#!/bin/bash
# Check the complete Libp2pSyncAdapter struct definition and instantiation
rg -n -A20 'impl Libp2pSyncAdapter' crates/synapse-infra/src/adapters/libp2p_sync_adapter.rs | head -80

Repository: iberi22/synapse-protocol

Length of output: 811


🏁 Script executed:

#!/bin/bash
# Check if start() is actually called anywhere in production (not tests)
rg -n '\.start\(' crates --glob='!*test*' --glob='!**/tests/**'

Repository: iberi22/synapse-protocol

Length of output: 144


Guard start() against multiple calls and add restart capability or document single-call semantics.

The start() method lacks idempotence guards: calling it multiple times spawns duplicate background tasks. Additionally, the one-way watch channel makes restart impossible after shutdown()—a late subscriber will see the true value immediately but won't trigger changed(), causing the task to exit on re-entry.

Either add a flag to track whether start() has been called (rejecting duplicates), make the adapter reusable with a reset mechanism, or document that start() must only be called once per instance.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/synapse-infra/src/adapters/libp2p_sync_adapter.rs` around lines 49 -
77, The start() method can be called multiple times and will spawn duplicate
background tasks; add an idempotence guard by introducing an AtomicBool (e.g.,
started) on the adapter struct and in start() do a compare_exchange(false, true)
to atomically mark started and return early (or log/Err) if it was already true;
keep existing shutdown() behavior using shutdown_tx and do not change
sync_peers()/sync_memories() — this prevents duplicate tokio::spawn tasks and
documents single-call semantics for start().


/// Synchronizes memory metadata over the DHT.
pub async fn sync_memories(&self) -> Result<()> {
info!("Starting memory sync process");

// 1. Publish our latest memory update timestamp
let local_peer_id = self.network_port.local_peer_id().await?;
let last_node = self.memory_storage.fetch_last_node().await?;
let last_update = last_node.map(|n| n.updated_at).unwrap_or(0);

let key = memory_sync_key(&local_peer_id);
let value = last_update.to_be_bytes().to_vec();
self.network_port.dht_put(key, value).await?;

// 2. Check connected peers' memory timestamps
let connected_peers = self.network_port.connected_peers().await?;
for peer_id in connected_peers {
if peer_id == local_peer_id {
continue;
}

let key = memory_sync_key(&peer_id);
if let Ok(Some(value)) = self.network_port.dht_get(key).await {
if value.len() == 8 {
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&value);
let peer_last_update = i64::from_be_bytes(bytes);

if peer_last_update > last_update {
info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update);
}
}
Comment on lines +101 to +109

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to convert the Vec<u8> to [u8; 8] can be made more concise and idiomatic by using try_from. This avoids manually checking the length and then copying the slice, making the code safer and cleaner.

                if let Ok(bytes) = <[u8; 8]>::try_from(value.as_slice()) {
                    let peer_last_update = i64::from_be_bytes(bytes);

                    if peer_last_update > last_update {
                        info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update);
                    }
                }

}
Comment on lines +99 to +110
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don’t silently drop DHT read failures in memory sync.

The current if let Ok(Some(value)) = ... path suppresses DHT errors and malformed payloads, so sync can report success while peer metadata reads are failing.

Suggested patch
-            let key = memory_sync_key(&peer_id);
-            if let Ok(Some(value)) = self.network_port.dht_get(key).await {
-                if value.len() == 8 {
-                    let mut bytes = [0u8; 8];
-                    bytes.copy_from_slice(&value);
-                    let peer_last_update = i64::from_be_bytes(bytes);
-
-                    if peer_last_update > last_update {
-                        info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update);
-                    }
-                }
-            }
+            let key = memory_sync_key(&peer_id);
+            match self.network_port.dht_get(key).await {
+                Ok(Some(value)) if value.len() == 8 => {
+                    let mut bytes = [0u8; 8];
+                    bytes.copy_from_slice(&value);
+                    let peer_last_update = i64::from_be_bytes(bytes);
+
+                    if peer_last_update > last_update {
+                        info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update);
+                    }
+                }
+                Ok(Some(value)) => {
+                    warn!("Invalid memory sync payload size from {}: {}", peer_id, value.len());
+                }
+                Ok(None) => {}
+                Err(e) => {
+                    warn!("Failed to read memory sync marker for {}: {}", peer_id, e);
+                }
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let key = memory_sync_key(&peer_id);
if let Ok(Some(value)) = self.network_port.dht_get(key).await {
if value.len() == 8 {
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&value);
let peer_last_update = i64::from_be_bytes(bytes);
if peer_last_update > last_update {
info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update);
}
}
}
let key = memory_sync_key(&peer_id);
match self.network_port.dht_get(key).await {
Ok(Some(value)) if value.len() == 8 => {
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&value);
let peer_last_update = i64::from_be_bytes(bytes);
if peer_last_update > last_update {
info!("Peer {} has newer memories ({} > {}). In a full implementation, we would trigger a reconciliation.", peer_id, peer_last_update, last_update);
}
}
Ok(Some(value)) => {
warn!("Invalid memory sync payload size from {}: {}", peer_id, value.len());
}
Ok(None) => {}
Err(e) => {
warn!("Failed to read memory sync marker for {}: {}", peer_id, e);
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/synapse-infra/src/adapters/libp2p_sync_adapter.rs` around lines 99 -
110, The DHT read currently ignores errors and malformed payloads because it
uses if let Ok(Some(value)) = self.network_port.dht_get(key).await; change this
to match on the Result from self.network_port.dht_get(key).await so you
explicitly handle Err(e) (log error with the peer_id and e), handle Ok(None)
(log debug/warn that no value was found), and for Ok(Some(value)) validate
value.len() == 8 else log a warning about malformed payload before attempting
bytes.copy_from_slice; keep the existing peer_last_update comparison and log,
but ensure all failure cases are surfaced via logging (and return/propagate an
error if appropriate for the surrounding memory sync flow).

}

info!("Memory sync process completed");
Ok(())
}

/// Synchronizes peer information between local storage and the DHT.
pub async fn sync_peers(&self) -> Result<()> {
info!("Starting peer sync process");
Expand Down
Loading
Loading