diff --git a/Cargo.lock b/Cargo.lock index 21f687963..c486ae1a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3057,6 +3057,7 @@ dependencies = [ "chrono", "deadpool-redis", "dirs", + "futures", "neo4rs", "opentelemetry 0.31.0", "opentelemetry-appender-tracing", @@ -3075,6 +3076,7 @@ dependencies = [ "tracing", "tracing-log", "tracing-subscriber", + "tracing-test", "utoipa", ] @@ -3155,6 +3157,7 @@ dependencies = [ "chrono", "clap", "dirs", + "futures", "neo4rs", "nexus-common", "nexus-watcher", @@ -6169,6 +6172,27 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracing-test" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a4c448db514d4f24c5ddb9f73f2ee71bfb24c526cf0c570ba142d1119e0051" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad06847b7afb65c7866a36664b75c40b895e318cea4f71299f013fb22965329d" +dependencies = [ + "quote", + "syn 2.0.115", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index fca315035..c137d7b2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,3 +32,5 @@ thiserror = "2.0.17" tokio = { version = "1.49.0", features = ["full"] } tokio-shared-rt = "0.1" tracing = "0.1.44" +tracing-test = "0.2" +futures = "0.3" diff --git a/examples/api/api-config.toml b/examples/api/api-config.toml index e273a4e99..467ce884d 100644 --- a/examples/api/api-config.toml +++ b/examples/api/api-config.toml @@ -18,4 +18,10 @@ redis = "redis://127.0.0.1:6379" uri = "bolt://localhost:7687" # Not needed in the Community Edition #user = "neo4j" -password = "12345678" \ No newline at end of file +password = "12345678" +# Queries taking longer than this (ms) will be logged as warnings +slow_query_logging_threshold_ms = 100 +# Enable or disable slow query logging +#slow_query_logging_enabled = true +# Include the full cypher (with interpolated params) in slow-query warnings +#slow_query_logging_include_cypher = false diff --git a/examples/watcher/watcher-config.toml b/examples/watcher/watcher-config.toml index 35de107f9..7fd99abf9 100644 --- a/examples/watcher/watcher-config.toml +++ b/examples/watcher/watcher-config.toml @@ -31,3 +31,9 @@ uri = "bolt://localhost:7687" # Not needed in the Community Edition #user = "neo4j" password = "12345678" +# Queries taking longer than this (ms) will be logged as warnings +slow_query_logging_threshold_ms = 100 +# Enable or disable slow query logging +#slow_query_logging_enabled = true +# Include the full cypher (with interpolated params) in slow-query warnings +#slow_query_logging_include_cypher = false diff --git a/nexus-common/Cargo.toml b/nexus-common/Cargo.toml index 7d1a16cfd..4a424cff9 100644 --- a/nexus-common/Cargo.toml +++ b/nexus-common/Cargo.toml @@ -11,6 +11,7 @@ license = "MIT" async-trait = { workspace = true } dirs = "6.0.0" chrono = { workspace = true } +futures = { workspace = true } neo4rs = { workspace = true } opentelemetry = { workspace = true } opentelemetry-appender-tracing = "0.31.1" @@ -33,3 +34,4 @@ utoipa = "5.4.0" [dev-dependencies] tempfile = { workspace = true } tokio-shared-rt = { workspace = true } +tracing-test = { workspace = true } diff --git a/nexus-common/default.config.toml b/nexus-common/default.config.toml index 6caae7f27..dfbb40520 100644 --- a/nexus-common/default.config.toml +++ b/nexus-common/default.config.toml @@ -51,3 +51,9 @@ uri = "bolt://localhost:7687" # Not needed in the Community Edition the profile username, just the password #user = "neo4j" password = "12345678" +# Queries taking longer than this (ms) will be logged as warnings +slow_query_logging_threshold_ms = 100 +# Enable or disable slow query logging +slow_query_logging_enabled = true +# Include the Cypher query text in slow query log entries +#slow_query_logging_include_cypher = false diff --git a/nexus-common/src/db/config/mod.rs b/nexus-common/src/db/config/mod.rs index 134725091..5e4952bbd 100644 --- a/nexus-common/src/db/config/mod.rs +++ b/nexus-common/src/db/config/mod.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use std::fmt::Debug; mod neo4j; -pub use neo4j::Neo4JConfig; +pub use neo4j::{Neo4JConfig, DEFAULT_SLOW_QUERY_THRESHOLD_MS}; pub const REDIS_URI: &str = "redis://localhost:6379"; diff --git a/nexus-common/src/db/config/neo4j.rs b/nexus-common/src/db/config/neo4j.rs index 1e9ec56cb..ef15950f3 100644 --- a/nexus-common/src/db/config/neo4j.rs +++ b/nexus-common/src/db/config/neo4j.rs @@ -3,26 +3,55 @@ use serde::{Deserialize, Serialize}; pub const NEO4J_URI: &str = "bolt://localhost:7687"; pub const NEO4J_USER: &str = "neo4j"; pub const NEO4J_PASS: &str = "12345678"; +pub const DEFAULT_SLOW_QUERY_THRESHOLD_MS: u64 = 100; // Create temporal struct to wrap database config #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct Neo4JConfig { pub uri: String, + #[serde(default = "default_neo4j_user")] pub user: String, + pub password: String, + + /// Queries exceeding this threshold (in milliseconds) are logged as warnings. + /// Only used when `slow_query_logging_enabled` is true. + #[serde(default = "default_slow_query_logging_threshold_ms")] + pub slow_query_logging_threshold_ms: u64, + + /// Enable slow-query logging. Defaults to true. + /// Set to false for CLI/admin commands where tracing overhead is unnecessary. + #[serde(default = "default_slow_query_logging_enabled")] + pub slow_query_logging_enabled: bool, + + /// Include the full cypher (with interpolated params) in slow-query warnings. + /// Useful for debugging but verbose. Defaults to false. + #[serde(default)] + pub slow_query_logging_include_cypher: bool, } fn default_neo4j_user() -> String { String::from("neo4j") } +fn default_slow_query_logging_threshold_ms() -> u64 { + DEFAULT_SLOW_QUERY_THRESHOLD_MS +} + +fn default_slow_query_logging_enabled() -> bool { + true +} + impl Default for Neo4JConfig { fn default() -> Self { Self { uri: String::from(NEO4J_URI), user: String::from(NEO4J_USER), password: String::from(NEO4J_PASS), + slow_query_logging_threshold_ms: DEFAULT_SLOW_QUERY_THRESHOLD_MS, + slow_query_logging_enabled: true, + slow_query_logging_include_cypher: false, } } } diff --git a/nexus-common/src/db/connectors/neo4j.rs b/nexus-common/src/db/connectors/neo4j.rs index fec9c2d9a..72ed4a721 100644 --- a/nexus-common/src/db/connectors/neo4j.rs +++ b/nexus-common/src/db/connectors/neo4j.rs @@ -1,26 +1,23 @@ -use neo4rs::{query, Graph}; +use crate::db::graph::Query; use std::fmt; -use std::sync::OnceLock; +use std::sync::{Arc, OnceLock}; +use std::time::Duration; use tracing::{debug, info}; use crate::db::graph::error::{GraphError, GraphResult}; +use crate::db::graph::{Graph, GraphOps, TracedGraph}; use crate::db::setup::setup_graph; use crate::db::Neo4JConfig; use crate::types::DynError; pub struct Neo4jConnector { - pub graph: Graph, + graph: Arc, } impl Neo4jConnector { /// Initialize and register the global Neo4j connector and verify connectivity pub async fn init(neo4j_config: &Neo4JConfig) -> Result<(), DynError> { - let neo4j_connector = Neo4jConnector::new_connection( - &neo4j_config.uri, - &neo4j_config.user, - &neo4j_config.password, - ) - .await?; + let neo4j_connector = Neo4jConnector::new_connection(neo4j_config).await?; neo4j_connector.ping(&neo4j_config.uri).await?; @@ -35,17 +32,28 @@ impl Neo4jConnector { } /// Create and return a new connector after defining a database connection - async fn new_connection(uri: &str, user: &str, password: &str) -> GraphResult { - let graph = Graph::new(uri, user, password).await?; - let neo4j_connector = Neo4jConnector { graph }; - info!("Created Neo4j connector"); + async fn new_connection(config: &Neo4JConfig) -> GraphResult { + let neo4j_graph = neo4rs::Graph::new(&config.uri, &config.user, &config.password).await?; + let graph = Graph::new(neo4j_graph); + + let graph: Arc = if config.slow_query_logging_enabled { + let threshold = Duration::from_millis(config.slow_query_logging_threshold_ms); + Arc::new( + TracedGraph::new(graph) + .with_slow_query_threshold(threshold) + .with_log_cypher(config.slow_query_logging_include_cypher), + ) + } else { + Arc::new(graph) + }; - Ok(neo4j_connector) + info!("Created Neo4j connector"); + Ok(Neo4jConnector { graph }) } /// Perform a health-check PING over the Bolt protocol to the Neo4j server async fn ping(&self, neo4j_uri: &str) -> Result<(), DynError> { - if let Err(neo4j_err) = self.graph.execute(query("RETURN 1")).await { + if let Err(neo4j_err) = self.graph.run(Query::new("ping", "RETURN 1")).await { return Err(format!("Failed to PING to Neo4j at {neo4j_uri}, {neo4j_err}").into()); } @@ -57,13 +65,13 @@ impl Neo4jConnector { impl fmt::Debug for Neo4jConnector { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Neo4jConnector") - .field("graph", &"Graph instance") + .field("graph", &"GraphOps instance") .finish() } } /// Helper to retrieve a Neo4j graph connection. -pub fn get_neo4j_graph() -> GraphResult { +pub fn get_neo4j_graph() -> GraphResult> { NEO4J_CONNECTOR .get() .ok_or(GraphError::ConnectionNotInitialized) diff --git a/nexus-common/src/db/graph/exec.rs b/nexus-common/src/db/graph/exec.rs index ed577acbd..c8c170f33 100644 --- a/nexus-common/src/db/graph/exec.rs +++ b/nexus-common/src/db/graph/exec.rs @@ -1,5 +1,7 @@ +use super::query::Query; use crate::db::{get_neo4j_graph, graph::error::GraphResult}; -use neo4rs::{Query, Row}; +use futures::TryStreamExt; +use neo4rs::Row; use serde::de::DeserializeOwned; /// Represents the outcome of a mutation-like query in the graph database. @@ -37,7 +39,7 @@ pub async fn execute_graph_operation(query: Query) -> GraphResult GraphResult<()> { let graph = get_neo4j_graph()?; let mut result = graph.execute(query).await?; - result.next().await?; + result.try_next().await?; Ok(()) } @@ -46,20 +48,15 @@ pub async fn fetch_row_from_graph(query: Query) -> GraphResult> { let mut result = graph.execute(query).await?; - result.next().await.map_err(Into::into) + result.try_next().await.map_err(Into::into) } pub async fn fetch_all_rows_from_graph(query: Query) -> GraphResult> { let graph = get_neo4j_graph()?; - let mut result = graph.execute(query).await?; - let mut rows = Vec::new(); - - while let Some(row) = result.next().await? { - rows.push(row); - } + let result = graph.execute(query).await?; - Ok(rows) + result.try_collect().await.map_err(Into::into) } /// Fetch the value of type T mapped to a specific key from the first row of a graph query's result diff --git a/nexus-common/src/db/graph/mod.rs b/nexus-common/src/db/graph/mod.rs index 4e73eb72e..7133cb7f6 100644 --- a/nexus-common/src/db/graph/mod.rs +++ b/nexus-common/src/db/graph/mod.rs @@ -1,6 +1,11 @@ pub mod error; pub mod exec; pub mod queries; +mod query; pub mod setup; +mod traced; pub use error::{GraphError, GraphResult}; +pub use query::Query; +pub use traced::GraphOps; +pub(crate) use traced::{Graph, TracedGraph}; diff --git a/nexus-common/src/db/graph/queries/del.rs b/nexus-common/src/db/graph/queries/del.rs index a69e1834d..92c184f24 100644 --- a/nexus-common/src/db/graph/queries/del.rs +++ b/nexus-common/src/db/graph/queries/del.rs @@ -1,10 +1,11 @@ -use neo4rs::{query, Query}; +use crate::db::graph::Query; /// Deletes a user node and all its relationships /// # Arguments /// * `user_id` - The unique identifier of the user to be deleted pub fn delete_user(user_id: &str) -> Query { - query( + Query::new( + "delete_user", "MATCH (u:User {id: $id}) DETACH DELETE u;", ) @@ -16,7 +17,8 @@ pub fn delete_user(user_id: &str) -> Query { /// * `author_id` - The unique identifier of the user who authored the post. /// * `post_id` - The unique identifier of the post to be deleted. pub fn delete_post(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "delete_post", "MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) DETACH DELETE p;", ) @@ -29,11 +31,12 @@ pub fn delete_post(author_id: &str, post_id: &str) -> Query { /// * `follower_id` - The unique identifier of the user who is following another user. /// * `followee_id` - The unique identifier of the user being followed pub fn delete_follow(follower_id: &str, followee_id: &str) -> Query { - query( + Query::new( + "delete_follow", "// Important that MATCH to check if both users are in the graph MATCH (follower:User {id: $follower_id}), (followee:User {id: $followee_id}) // Check if follow already exist - OPTIONAL MATCH (follower)-[existing:FOLLOWS]->(followee) + OPTIONAL MATCH (follower)-[existing:FOLLOWS]->(followee) DELETE existing // Returns true if the relationship does not exist as 'flag' RETURN existing IS NULL AS flag;", @@ -47,7 +50,8 @@ pub fn delete_follow(follower_id: &str, followee_id: &str) -> Query { /// * `user_id` - The unique identifier of the user who created the bookmark. /// * `bookmark_id` - The unique identifier of the bookmark relationship to be deleted. pub fn delete_bookmark(user_id: &str, bookmark_id: &str) -> Query { - query( + Query::new( + "delete_bookmark", "MATCH (u:User {id: $user_id})-[b:BOOKMARKED {id: $bookmark_id}]->(post:Post)<-[:AUTHORED]-(author:User) WITH post.id as post_id, author.id as author_id, b DELETE b @@ -62,7 +66,8 @@ pub fn delete_bookmark(user_id: &str, bookmark_id: &str) -> Query { /// * `user_id` - The unique identifier of the user who created the tag. /// * `tag_id` - The unique identifier of the `TAGGED` relationship to be deleted. pub fn delete_tag(user_id: &str, tag_id: &str) -> Query { - query( + Query::new( + "delete_tag", "MATCH (user:User {id: $user_id})-[tag:TAGGED {id: $tag_id}]->(target) OPTIONAL MATCH (target)<-[:AUTHORED]-(author:User) WITH CASE WHEN target:User THEN target.id ELSE null END AS user_id, @@ -82,7 +87,8 @@ pub fn delete_tag(user_id: &str, tag_id: &str) -> Query { /// * `owner_id` - The unique identifier of the user who owns the file /// * `file_id` - The unique identifier of the file to be deleted pub fn delete_file(owner_id: &str, file_id: &str) -> Query { - query( + Query::new( + "delete_file", "MATCH (f:File {id: $id, owner_id: $owner_id}) DETACH DELETE f;", ) diff --git a/nexus-common/src/db/graph/queries/get.rs b/nexus-common/src/db/graph/queries/get.rs index bbace1f84..7bc7a0b8d 100644 --- a/nexus-common/src/db/graph/queries/get.rs +++ b/nexus-common/src/db/graph/queries/get.rs @@ -1,15 +1,16 @@ +use crate::db::graph::Query; use crate::models::post::StreamSource; use crate::types::routes::HotTagsInputDTO; use crate::types::Pagination; use crate::types::StreamReach; use crate::types::StreamSorting; use crate::types::Timeframe; -use neo4rs::{query, Query}; use pubky_app_specs::PubkyAppPostKind; // Retrieve post node by post id and author id pub fn get_post_by_id(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "get_post_by_id", " MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) OPTIONAL MATCH (p)-[replied:REPLIED]->(parent_post:Post)<-[:AUTHORED]-(author:User) @@ -34,19 +35,20 @@ pub fn get_post_by_id(author_id: &str, post_id: &str) -> Query { } pub fn post_counts(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "post_counts", " - MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) - WITH p - OPTIONAL MATCH (p)<-[t:TAGGED]-() - WITH p, COUNT (t) AS tags_count, COUNT(DISTINCT t.label) AS unique_tags_count - RETURN p IS NOT NULL AS exists, - { - tags: tags_count, - unique_tags: unique_tags_count, - replies: COUNT { (p)<-[:REPLIED]-() }, - reposts: COUNT { (p)<-[:REPOSTED]-() } - } AS counts, + MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) + WITH p + OPTIONAL MATCH (p)<-[t:TAGGED]-() + WITH p, COUNT (t) AS tags_count, COUNT(DISTINCT t.label) AS unique_tags_count + RETURN p IS NOT NULL AS exists, + { + tags: tags_count, + unique_tags: unique_tags_count, + replies: COUNT { (p)<-[:REPLIED]-() }, + reposts: COUNT { (p)<-[:REPOSTED]-() } + } AS counts, EXISTS { (p)-[:REPLIED]->(:Post) } AS is_reply ", ) @@ -56,7 +58,8 @@ pub fn post_counts(author_id: &str, post_id: &str) -> Query { // Check if the viewer_id has a bookmark in the post pub fn post_bookmark(author_id: &str, post_id: &str, viewer_id: &str) -> Query { - query( + Query::new( + "post_bookmark", "MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) MATCH (viewer:User {id: $viewer_id})-[b:BOOKMARKED]->(p) RETURN b", @@ -68,7 +71,8 @@ pub fn post_bookmark(author_id: &str, post_id: &str, viewer_id: &str) -> Query { // Check all the bookmarks that user creates pub fn user_bookmarks(user_id: &str) -> Query { - query( + Query::new( + "user_bookmarks", "MATCH (u:User {id: $user_id})-[b:BOOKMARKED]->(p:Post)<-[:AUTHORED]-(author:User) RETURN b, p.id AS post_id, author.id AS author_id", ) @@ -77,7 +81,8 @@ pub fn user_bookmarks(user_id: &str) -> Query { // Get all the bookmarks that a post has received (used for edit/delete notifications) pub fn get_post_bookmarks(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "get_post_bookmarks", "MATCH (bookmarker:User)-[b:BOOKMARKED]->(p:Post {id: $post_id})<-[:AUTHORED]-(author:User {id: $author_id}) RETURN b.id AS bookmark_id, bookmarker.id AS bookmarker_id", ) @@ -87,7 +92,8 @@ pub fn get_post_bookmarks(author_id: &str, post_id: &str) -> Query { // Get all the reposts that a post has received (used for edit/delete notifications) pub fn get_post_reposts(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "get_post_reposts", "MATCH (reposter:User)-[:AUTHORED]->(repost:Post)-[:REPOSTED]->(p:Post {id: $post_id})<-[:AUTHORED]-(author:User {id: $author_id}) RETURN reposter.id AS reposter_id, repost.id AS repost_id", ) @@ -97,7 +103,8 @@ pub fn get_post_reposts(author_id: &str, post_id: &str) -> Query { // Get all the replies that a post has received (used for edit/delete notifications) pub fn get_post_replies(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "get_post_replies", "MATCH (replier:User)-[:AUTHORED]->(reply:Post)-[:REPLIED]->(p:Post {id: $post_id})<-[:AUTHORED]-(author:User {id: $author_id}) RETURN replier.id AS replier_id, reply.id AS reply_id", ) @@ -107,7 +114,8 @@ pub fn get_post_replies(author_id: &str, post_id: &str) -> Query { // Get all the tags/taggers that a post has received (used for edit/delete notifications) pub fn get_post_tags(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "get_post_tags", "MATCH (tagger:User)-[t:TAGGED]->(p:Post {id: $post_id})<-[:AUTHORED]-(author:User {id: $author_id}) RETURN tagger.id AS tagger_id, t.id AS tag_id", ) @@ -116,15 +124,16 @@ pub fn get_post_tags(author_id: &str, post_id: &str) -> Query { } pub fn post_relationships(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "post_relationships", "MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) OPTIONAL MATCH (p)-[:REPLIED]->(replied_post:Post)<-[:AUTHORED]-(replied_author:User) OPTIONAL MATCH (p)-[:REPOSTED]->(reposted_post:Post)<-[:AUTHORED]-(reposted_author:User) OPTIONAL MATCH (p)-[:MENTIONED]->(mentioned_user:User) - RETURN - replied_post.id AS replied_post_id, + RETURN + replied_post.id AS replied_post_id, replied_author.id AS replied_author_id, - reposted_post.id AS reposted_post_id, + reposted_post.id AS reposted_post_id, reposted_author.id AS reposted_author_id, COLLECT(mentioned_user.id) AS mentioned_user_ids", ) @@ -135,14 +144,15 @@ pub fn post_relationships(author_id: &str, post_id: &str) -> Query { // Retrieve many users by id // We return also id if not we will not get not found users pub fn get_users_details_by_ids(user_ids: &[&str]) -> Query { - query( + Query::new( + "get_users_details_by_ids", " UNWIND $ids AS id OPTIONAL MATCH (record:User {id: id}) - RETURN + RETURN id, - CASE - WHEN record IS NOT NULL + CASE + WHEN record IS NOT NULL THEN record ELSE null END AS record @@ -152,8 +162,9 @@ pub fn get_users_details_by_ids(user_ids: &[&str]) -> Query { } /// Retrieves unique global tags for posts, returning a list of `post_ids` and `timestamp` pairs for each tag label. -pub fn global_tags_by_post() -> neo4rs::Query { - query( +pub fn global_tags_by_post() -> Query { + Query::new( + "global_tags_by_post", " MATCH (tagger:User)-[t:TAGGED]->(post:Post)<-[:AUTHORED]-(author:User) WITH t.label AS label, author.id + ':' + post.id AS post_id, post.indexed_at AS score @@ -168,8 +179,9 @@ pub fn global_tags_by_post() -> neo4rs::Query { /// Retrieves unique global tags for posts, calculating an engagement score based on tag counts, /// replies, reposts and mentions. The query returns a `key` by combining author's ID /// and post's ID, along with a sorted set of engagement scores for each tag label. -pub fn global_tags_by_post_engagement() -> neo4rs::Query { - query( +pub fn global_tags_by_post_engagement() -> Query { + Query::new( + "global_tags_by_post_engagement", " MATCH (author:User)-[:AUTHORED]->(post:Post)<-[tag:TAGGED]-(tagger:User) WITH post, COUNT(tag) AS tags_count, tag.label AS label, author.id + ':' + post.id AS key @@ -183,13 +195,14 @@ pub fn global_tags_by_post_engagement() -> neo4rs::Query { WITH label, COLLECT([toFloat(taggers + replies_count + reposts_count + mention_count), key ]) AS sorted_set RETURN label, sorted_set order by label - " + ", ) } // Retrieve all the tags of the post -pub fn post_tags(user_id: &str, post_id: &str) -> neo4rs::Query { - query( +pub fn post_tags(user_id: &str, post_id: &str) -> Query { + Query::new( + "post_tags", " MATCH (u:User {id: $user_id})-[:AUTHORED]->(p:Post {id: $post_id}) CALL { @@ -202,7 +215,7 @@ pub fn post_tags(user_id: &str, post_id: &str) -> neo4rs::Query { taggers_count: SIZE(tagger_ids) }) AS tags } - RETURN + RETURN u IS NOT NULL AS exists, tags ", @@ -212,8 +225,9 @@ pub fn post_tags(user_id: &str, post_id: &str) -> neo4rs::Query { } // Retrieve all the tags of the user -pub fn user_tags(user_id: &str) -> neo4rs::Query { - query( +pub fn user_tags(user_id: &str) -> Query { + Query::new( + "user_tags", " MATCH (u:User {id: $user_id}) CALL { @@ -226,7 +240,7 @@ pub fn user_tags(user_id: &str) -> neo4rs::Query { taggers_count: SIZE(tagger_ids) }) AS tags } - RETURN + RETURN u IS NOT NULL AS exists, tags ", @@ -236,7 +250,8 @@ pub fn user_tags(user_id: &str) -> neo4rs::Query { /// Retrieve a homeserver by ID pub fn get_homeserver_by_id(id: &str) -> Query { - query( + Query::new( + "get_homeserver_by_id", "MATCH (hs:Homeserver {id: $id}) WITH hs.id AS id RETURN id", @@ -246,7 +261,8 @@ pub fn get_homeserver_by_id(id: &str) -> Query { /// Retrieves all homeserver IDs pub fn get_all_homeservers() -> Query { - query( + Query::new( + "get_all_homeservers", "MATCH (hs:Homeserver) WITH collect(hs.id) AS homeservers_list RETURN homeservers_list", @@ -274,7 +290,7 @@ pub fn get_all_homeservers() -> Query { /// - `label`: The tag label. /// - `taggers`: A list of tagger user IDs who applied the tag. /// - `taggers_count`: The number of taggers who applied the tag. -pub fn get_viewer_trusted_network_tags(user_id: &str, viewer_id: &str, depth: u8) -> neo4rs::Query { +pub fn get_viewer_trusted_network_tags(user_id: &str, viewer_id: &str, depth: u8) -> Query { let graph_query = format!( " MATCH (viewer:User {{id: $viewer_id}}) @@ -297,15 +313,16 @@ pub fn get_viewer_trusted_network_tags(user_id: &str, viewer_id: &str, depth: u8 ); // Add to the query the params - query(graph_query.as_str()) + Query::new("get_viewer_trusted_network_tags", graph_query.as_str()) .param("user_id", user_id) .param("viewer_id", viewer_id) } -pub fn user_counts(user_id: &str) -> neo4rs::Query { - query( +pub fn user_counts(user_id: &str) -> Query { + Query::new( + "user_counts", " - MATCH (u:User {id: $user_id}) + MATCH (u:User {id: $user_id}) // tags that reference this user OPTIONAL MATCH (u)<-[t:TAGGED]-(:User) WITH u, COUNT(DISTINCT t.label) AS unique_tags, @@ -325,7 +342,7 @@ pub fn user_counts(user_id: &str) -> neo4rs::Query { COUNT { (u)-[:TAGGED]->(:Post) } AS post_tags, COUNT { (:User)-[:TAGGED]->(u) } AS tags - RETURN + RETURN u IS NOT NULL AS exists, { following: following, @@ -356,7 +373,7 @@ pub fn get_user_followers(user_id: &str, skip: Option, limit: Option, limit: Option) -> Query { @@ -372,7 +389,7 @@ pub fn get_user_following(user_id: &str, skip: Option, limit: Option String { @@ -389,7 +406,8 @@ fn stream_reach_to_graph_subquery(reach: &StreamReach) -> String { } pub fn get_tags_by_label_prefix(label_prefix: &str) -> Query { - query( + Query::new( + "get_tags_by_label_prefix", " MATCH ()-[t:TAGGED]->() WHERE t.label STARTS WITH $label_prefix @@ -400,7 +418,8 @@ pub fn get_tags_by_label_prefix(label_prefix: &str) -> Query { } pub fn get_tags() -> Query { - query( + Query::new( + "get_tags", " MATCH ()-[t:TAGGED]->() RETURN COLLECT(DISTINCT t.label) AS tag_labels @@ -415,9 +434,8 @@ pub fn get_tag_taggers_by_reach( skip: usize, limit: usize, ) -> Query { - query( - format!( - " + let cypher = format!( + " {} // The tagged node can be generic, representing either a Post, a User, or both. // For now, it will be a Post to align with UX requirements. @@ -434,14 +452,13 @@ pub fn get_tag_taggers_by_reach( RETURN COLLECT(row.reach_id) AS tagger_ids ", - stream_reach_to_graph_subquery(&reach) - ) - .as_str(), - ) - .param("label", label) - .param("user_id", user_id) - .param("skip", skip as i64) - .param("limit", limit as i64) + stream_reach_to_graph_subquery(&reach) + ); + Query::new("get_tag_taggers_by_reach", &cypher) + .param("label", label) + .param("user_id", user_id) + .param("skip", skip as i64) + .param("limit", limit as i64) } pub fn get_hot_tags_by_reach( @@ -455,13 +472,12 @@ pub fn get_hot_tags_by_reach( }; let (from, to) = tags_query.timeframe.to_timestamp_range(); - query( - format!( - " + let cypher = format!( + " {} MATCH (reach)-[tag:TAGGED]->(tagged:{}) WHERE user.id = $user_id AND tag.indexed_at >= $from AND tag.indexed_at < $to - WITH + WITH tag.label AS label, COLLECT(DISTINCT reach.id)[..{}] AS taggers, COUNT(DISTINCT tagged) AS uniqueTaggedCount, @@ -476,17 +492,16 @@ pub fn get_hot_tags_by_reach( SKIP $skip LIMIT $limit RETURN COLLECT(hot_tag) as hot_tags ", - stream_reach_to_graph_subquery(&reach), - input_tagged_type, - tags_query.taggers_limit - ) - .as_str(), - ) - .param("user_id", user_id) - .param("skip", tags_query.skip as i64) - .param("limit", tags_query.limit as i64) - .param("from", from) - .param("to", to) + stream_reach_to_graph_subquery(&reach), + input_tagged_type, + tags_query.taggers_limit + ); + Query::new("get_hot_tags_by_reach", &cypher) + .param("user_id", user_id) + .param("skip", tags_query.skip as i64) + .param("limit", tags_query.limit as i64) + .param("from", from) + .param("to", to) } pub fn get_global_hot_tags(tags_query: &HotTagsInputDTO) -> Query { @@ -495,12 +510,11 @@ pub fn get_global_hot_tags(tags_query: &HotTagsInputDTO) -> Query { None => String::from("Post|User"), }; let (from, to) = tags_query.timeframe.to_timestamp_range(); - query( - format!( - " - MATCH (user: User)-[tag:TAGGED]->(tagged:{}) + let cypher = format!( + " + MATCH (user: User)-[tag:TAGGED]->(tagged:{}) WHERE tag.indexed_at >= $from AND tag.indexed_at < $to - WITH + WITH tag.label AS label, COLLECT(DISTINCT user.id)[..{}] AS taggers, COUNT(DISTINCT tagged) AS uniqueTaggedCount, @@ -515,14 +529,13 @@ pub fn get_global_hot_tags(tags_query: &HotTagsInputDTO) -> Query { SKIP $skip LIMIT $limit RETURN COLLECT(hot_tag) as hot_tags ", - input_tagged_type, tags_query.taggers_limit - ) - .as_str(), - ) - .param("skip", tags_query.skip as i64) - .param("limit", tags_query.limit as i64) - .param("from", from) - .param("to", to) + input_tagged_type, tags_query.taggers_limit + ); + Query::new("get_global_hot_tags", &cypher) + .param("skip", tags_query.skip as i64) + .param("limit", tags_query.limit as i64) + .param("from", from) + .param("to", to) } pub fn get_influencers_by_reach( @@ -533,9 +546,8 @@ pub fn get_influencers_by_reach( timeframe: &Timeframe, ) -> Query { let (from, to) = timeframe.to_timestamp_range(); - query( - format!( - " + let cypher = format!( + " {} WHERE user.id = $user_id WITH DISTINCT reach @@ -562,24 +574,24 @@ pub fn get_influencers_by_reach( score: (tags_count + posts_count) * sqrt(followers_count) }} AS influencer ORDER BY influencer.score DESC - SKIP $skip + SKIP $skip LIMIT $limit RETURN COLLECT([influencer.id, influencer.score]) as influencers ", - stream_reach_to_graph_subquery(&reach), - ) - .as_str(), - ) - .param("user_id", user_id) - .param("skip", skip as i64) - .param("limit", limit as i64) - .param("from", from) - .param("to", to) + stream_reach_to_graph_subquery(&reach), + ); + Query::new("get_influencers_by_reach", &cypher) + .param("user_id", user_id) + .param("skip", skip as i64) + .param("limit", limit as i64) + .param("from", from) + .param("to", to) } pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) -> Query { let (from, to) = timeframe.to_timestamp_range(); - query( + Query::new( + "get_global_influencers", " MATCH (user:User) WHERE user.name <> '[DELETED]' @@ -590,7 +602,7 @@ pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) OPTIONAL MATCH (user)-[tag:TAGGED]->(tagged:Post) WHERE tag.indexed_at >= $from AND tag.indexed_at < $to - + OPTIONAL MATCH (user)-[authored:AUTHORED]->(post:Post) WHERE authored.indexed_at >= $from AND authored.indexed_at < $to @@ -601,9 +613,9 @@ pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) score: (tags_count + posts_count) * sqrt(followers_count) } AS influencer WHERE influencer.id IS NOT NULL - + ORDER BY influencer.score DESC, influencer.id ASC - SKIP $skip + SKIP $skip LIMIT $limit RETURN COLLECT([influencer.id, influencer.score]) as influencers ", @@ -615,7 +627,8 @@ pub fn get_global_influencers(skip: usize, limit: usize, timeframe: &Timeframe) } pub fn get_files_by_ids(key_pair: &[&[&str]]) -> Query { - query( + Query::new( + "get_files_by_ids", " UNWIND $pairs AS pair OPTIONAL MATCH (record:File {owner_id: pair[0], id: pair[1]}) @@ -778,7 +791,20 @@ pub fn post_stream( } // Build the query and apply parameters using `param` method - build_query_with_params(&cypher, &source, tags, kind, &pagination) + let query = Query::new( + match &source { + StreamSource::Following { .. } => "post_stream_following", + StreamSource::Followers { .. } => "post_stream_followers", + StreamSource::Friends { .. } => "post_stream_friends", + StreamSource::Bookmarks { .. } => "post_stream_bookmarks", + StreamSource::Author { .. } => "post_stream_author", + StreamSource::AuthorReplies { .. } => "post_stream_author_replies", + StreamSource::PostReplies { .. } => "post_stream_post_replies", + StreamSource::All => "post_stream_all", + }, + &cypher, + ); + build_query_with_params(query, &source, tags, kind, &pagination) } /// Appends a condition to the Cypher query, using `WHERE` if no `WHERE` clause @@ -799,28 +825,22 @@ fn append_condition(cypher: &mut String, condition: &str, where_clause_applied: } } -/// Builds a `Query` object by applying the necessary parameters to the Cypher query string. -/// -/// This function takes the constructed Cypher query string and applies all the relevant parameters -/// based on the provided `source`, `tags`, `kind`, and `pagination`. It ensures that all parameters -/// used in the query are properly set with their corresponding values. +/// Applies the necessary parameters to an already-constructed `Query`. /// /// # Arguments /// -/// * `cypher` - The Cypher query string that has been constructed. +/// * `query` - A `Query` already constructed with its label and cypher string. /// * `source` - The `StreamSource` specifying the origin of the posts (e.g., Following, Followers). /// * `tags` - An optional list of tag labels to filter the posts. /// * `kind` - An optional `PubkyAppPostKind` to filter the posts by their kind. /// * `pagination` - The `Pagination` object containing pagination parameters like `start`, `end`, `skip`, and `limit`. fn build_query_with_params( - cypher: &str, + mut query: Query, source: &StreamSource, tags: &Option>, kind: Option, pagination: &Pagination, ) -> Query { - let mut query = query(cypher); - if let Some(observer_id) = source.get_observer() { query = query.param("observer_id", observer_id.to_string()); } @@ -847,13 +867,14 @@ fn build_query_with_params( /// # Arguments /// * `user_id` - The unique identifier of the user pub fn user_is_safe_to_delete(user_id: &str) -> Query { - query( + Query::new( + "user_is_safe_to_delete", " MATCH (u:User {id: $user_id}) // Ensures all relationships to the user (u) are checked, counting as 0 if none exist OPTIONAL MATCH (u)-[r]-() // Checks if the user has any relationships - WITH u, NOT (COUNT(r) = 0) AS flag + With u, NOT (COUNT(r) = 0) AS flag RETURN flag ", ) @@ -868,7 +889,8 @@ pub fn user_is_safe_to_delete(user_id: &str) -> Query { /// * `author_id` - The unique identifier of the user who authored the post /// * `post_id` - The unique identifier of the post pub fn post_is_safe_to_delete(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "post_is_safe_to_delete", " MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) // Ensures all relationships to the post (p) are checked, counting as 0 if none exist @@ -885,7 +907,7 @@ pub fn post_is_safe_to_delete(author_id: &str, post_id: &str) -> Query { (type(r) = 'REPLIED' AND startNode(r) = p) ) // Checks if any disallowed relationships exist for the post - WITH p, NOT (COUNT(r) = 0) AS flag + With p, NOT (COUNT(r) = 0) AS flag RETURN flag ", ) @@ -895,8 +917,9 @@ pub fn post_is_safe_to_delete(author_id: &str, post_id: &str) -> Query { /// Find user recommendations: active users (with 5+ posts) who are 1-3 degrees of separation away /// from the given user, but not directly followed by them -pub fn recommend_users(user_id: &str, limit: usize) -> neo4rs::Query { - query( +pub fn recommend_users(user_id: &str, limit: usize) -> Query { + Query::new( + "recommend_users", " MATCH (user:User {id: $user_id}) MATCH (user)-[:FOLLOWS*1..3]->(potential:User) @@ -915,8 +938,9 @@ pub fn recommend_users(user_id: &str, limit: usize) -> neo4rs::Query { } /// Retrieve specific tag created by the user -pub fn get_tag_by_tagger_and_id(tagger_id: &str, tag_id: &str) -> neo4rs::Query { - query( +pub fn get_tag_by_tagger_and_id(tagger_id: &str, tag_id: &str) -> Query { + Query::new( + "get_tag_by_tagger_and_id", " MATCH (tagger:User { id: $tagger_id})-[tag:TAGGED {id: $tag_id }]->(tagged) OPTIONAL MATCH (author:User)-[:AUTHORED]->(tagged) diff --git a/nexus-common/src/db/graph/queries/put.rs b/nexus-common/src/db/graph/queries/put.rs index e9def1a08..9af5123d0 100644 --- a/nexus-common/src/db/graph/queries/put.rs +++ b/nexus-common/src/db/graph/queries/put.rs @@ -1,7 +1,7 @@ use crate::db::graph::error::{GraphError, GraphResult}; +use crate::db::graph::Query; use crate::models::post::PostRelationships; use crate::models::{file::FileDetails, post::PostDetails, user::UserDetails}; -use neo4rs::{query, Query}; use pubky_app_specs::{ParsedUri, Resource}; /// Create a user node @@ -9,7 +9,8 @@ pub fn create_user(user: &UserDetails) -> GraphResult { let links = serde_json::to_string(&user.links) .map_err(|e| GraphError::SerializationFailed(Box::new(e)))?; - let query = query( + let query = Query::new( + "create_user", "MERGE (u:User {id: $id}) SET u.name = $name, u.bio = $bio, u.status = $status, u.links = $links, u.image = $image, u.indexed_at = $indexed_at;", ) @@ -75,7 +76,7 @@ pub fn create_post( let kind = serde_json::to_string(&post.kind) .map_err(|e| GraphError::SerializationFailed(Box::new(e)))?; - let mut cypher_query = query(&cypher) + let mut cypher_query = Query::new("create_post", &cypher) .param("author_id", post.author.to_string()) .param("post_id", post.id.to_string()) .param("content", post.content.to_string()) @@ -143,7 +144,8 @@ pub fn create_mention_relationship( post_id: &str, mentioned_user_id: &str, ) -> Query { - query( + Query::new( + "create_mention_relationship", "MATCH (author:User {id: $author_id})-[:AUTHORED]->(post:Post {id: $post_id}), (mentioned_user:User {id: $mentioned_user_id}) MERGE (post)-[:MENTIONED]->(mentioned_user)", @@ -161,7 +163,8 @@ pub fn create_mention_relationship( /// * `followee_id` - The unique identifier of the user to be followed. /// * `indexed_at` - A timestamp representing when the relationship was indexed or updated. pub fn create_follow(follower_id: &str, followee_id: &str, indexed_at: i64) -> Query { - query( + Query::new( + "create_follow", "MATCH (follower:User {id: $follower_id}), (followee:User {id: $followee_id}) // Check if follow already existed OPTIONAL MATCH (follower)-[existing:FOLLOWS]->(followee) @@ -189,7 +192,8 @@ pub fn create_post_bookmark( bookmark_id: &str, indexed_at: i64, ) -> Query { - query( + Query::new( + "create_post_bookmark", "MATCH (u:User {id: $user_id}) // We assume these nodes are already created. If not we would not be able to add a bookmark MATCH (author:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) @@ -226,7 +230,8 @@ pub fn create_post_tag( label: &str, indexed_at: i64, ) -> Query { - query( + Query::new( + "create_post_tag", "MATCH (user:User {id: $user_id}) // We assume these nodes are already created. If not we would not be able to add a tag MATCH (author:User {id: $author_id})-[:AUTHORED]->(post:Post {id: $post_id}) @@ -260,7 +265,8 @@ pub fn create_user_tag( label: &str, indexed_at: i64, ) -> Query { - query( + Query::new( + "create_user_tag", "MATCH (tagged_used:User {id: $tagged_user_id}) MATCH (tagger:User {id: $tagger_user_id}) // Check if tag already existed @@ -283,7 +289,8 @@ pub fn create_file(file: &FileDetails) -> GraphResult { let urls = serde_json::to_string(&file.urls) .map_err(|e| GraphError::SerializationFailed(Box::new(e)))?; - let query = query( + let query = Query::new( + "create_file", "MERGE (f:File {id: $id, owner_id: $owner_id}) SET f.uri = $uri, f.indexed_at = $indexed_at, f.created_at = $created_at, f.size = $size, f.src = $src, f.name = $name, f.content_type = $content_type, f.urls = $urls;", @@ -304,7 +311,8 @@ pub fn create_file(file: &FileDetails) -> GraphResult { /// Create a homeserver pub fn create_homeserver(homeserver_id: &str) -> Query { - query( + Query::new( + "create_homeserver", "MERGE (hs:Homeserver { id: $id }) diff --git a/nexus-common/src/db/graph/query.rs b/nexus-common/src/db/graph/query.rs new file mode 100644 index 000000000..e19946885 --- /dev/null +++ b/nexus-common/src/db/graph/query.rs @@ -0,0 +1,324 @@ +use std::fmt::Write; + +use neo4rs::{BoltList, BoltMap, BoltString, BoltType}; + +/// Our own `Query` type that mirrors `neo4rs::Query` but exposes +/// `cypher()` and `params_map()` for logging and tracing. +#[derive(Clone)] +pub struct Query { + label: Option<&'static str>, + cypher: String, + params: BoltMap, +} + +impl Query { + pub fn new(label: &'static str, cypher: impl Into) -> Self { + Self { + label: Some(label), + cypher: cypher.into(), + params: BoltMap::default(), + } + } + + pub fn label(&self) -> Option<&'static str> { + self.label + } + + pub fn param>(mut self, key: &str, value: T) -> Self { + self.params.put(key.into(), value.into()); + self + } + + pub fn params(mut self, input: impl IntoIterator) -> Self + where + K: Into, + V: Into, + { + for (k, v) in input { + self.params.put(k.into(), v.into()); + } + self + } + + pub fn cypher(&self) -> &str { + &self.cypher + } + + pub fn params_map(&self) -> &BoltMap { + &self.params + } + + /// Returns the cypher string with `$param` placeholders replaced by their + /// literal values, ready to copy-paste into a Neo4j browser. + pub fn to_cypher_populated(&self) -> String { + populate_cypher(&self.cypher, &self.params) + } +} + +/// Replaces `$param` placeholders in `cypher` with literal values from `params`. +pub fn populate_cypher(cypher: &str, params: &BoltMap) -> String { + let mut out = cypher.to_owned(); + // Sort keys by length descending so `$skip` is replaced before a + // hypothetical `$s`, avoiding partial substitutions. + // + // NOTE: There is still a potential issue with this approach: if a + // parameter *value* happens to contain text matching another parameter + // name (e.g. param "a" has value "$b"), a later replacement pass will + // substitute inside the already-replaced value. A proper fix would + // require single-pass replacement or placeholder-based substitution. + let mut entries: Vec<_> = params.value.iter().collect(); + entries.sort_by(|a, b| b.0.value.len().cmp(&a.0.value.len())); + for (k, v) in entries { + let placeholder = format!("${}", k.value); + let literal = bolt_to_cypher_literal(v); + out = out.replace(&placeholder, &literal); + } + out +} + +/// Format a `BoltType` value as a Neo4j cypher literal. +fn bolt_to_cypher_literal(val: &BoltType) -> String { + match val { + BoltType::String(s) => format!( + "'{}'", + s.value + .replace('\\', "\\\\") + .replace('\'', "\\'") + .replace('\n', "\\n") + .replace('\r', "\\r") + .replace('\t', "\\t") + ), + BoltType::Integer(i) => i.value.to_string(), + BoltType::Float(f) => format!("{}", f.value), + BoltType::Boolean(b) => if b.value { "true" } else { "false" }.to_string(), + BoltType::Null(_) => "null".to_string(), + BoltType::List(list) => bolt_list_to_cypher(list), + BoltType::Map(map) => bolt_map_to_cypher(map), + other => format!("{:?}", other), + } +} + +fn bolt_list_to_cypher(list: &BoltList) -> String { + let mut out = String::from('['); + for (i, item) in list.value.iter().enumerate() { + if i > 0 { + out.push_str(", "); + } + out.push_str(&bolt_to_cypher_literal(item)); + } + out.push(']'); + out +} + +fn bolt_map_to_cypher(map: &BoltMap) -> String { + let mut out = String::from('{'); + for (i, (k, v)) in map.value.iter().enumerate() { + if i > 0 { + out.push_str(", "); + } + let _ = write!(out, "{}: {}", k.value, bolt_to_cypher_literal(v)); + } + out.push('}'); + out +} + +impl From for neo4rs::Query { + fn from(q: Query) -> neo4rs::Query { + let mut nq = neo4rs::Query::new(q.cypher); + for (k, v) in q.params.value { + nq = nq.param(&k.value, v); + } + nq + } +} + +#[cfg(test)] +fn query(cypher: impl Into) -> Query { + Query { + label: None, + cypher: cypher.into(), + params: BoltMap::default(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // ── bolt_to_cypher_literal ────────────────────────────────────── + + #[test] + fn literal_string_plain() { + let val = BoltType::String("hello".into()); + assert_eq!(bolt_to_cypher_literal(&val), "'hello'"); + } + + #[test] + fn literal_string_escapes_quotes_and_backslashes() { + let val = BoltType::String("it's a \\path".into()); + assert_eq!(bolt_to_cypher_literal(&val), "'it\\'s a \\\\path'"); + } + + #[test] + fn literal_string_escapes_control_chars() { + let val = BoltType::String("line1\nline2\r\tend".into()); + assert_eq!(bolt_to_cypher_literal(&val), "'line1\\nline2\\r\\tend'"); + } + + #[test] + fn literal_string_empty() { + let val = BoltType::String("".into()); + assert_eq!(bolt_to_cypher_literal(&val), "''"); + } + + #[test] + fn literal_integer() { + let val = BoltType::Integer(neo4rs::BoltInteger::new(42)); + assert_eq!(bolt_to_cypher_literal(&val), "42"); + } + + #[test] + fn literal_negative_integer() { + let val = BoltType::Integer(neo4rs::BoltInteger::new(-1)); + assert_eq!(bolt_to_cypher_literal(&val), "-1"); + } + + #[test] + fn literal_float() { + let val = BoltType::Float(neo4rs::BoltFloat::new(3.01)); + assert_eq!(bolt_to_cypher_literal(&val), "3.01"); + } + + #[test] + fn literal_boolean() { + assert_eq!( + bolt_to_cypher_literal(&BoltType::Boolean(neo4rs::BoltBoolean::new(true))), + "true" + ); + assert_eq!( + bolt_to_cypher_literal(&BoltType::Boolean(neo4rs::BoltBoolean::new(false))), + "false" + ); + } + + #[test] + fn literal_null() { + let val = BoltType::Null(neo4rs::BoltNull); + assert_eq!(bolt_to_cypher_literal(&val), "null"); + } + + #[test] + fn literal_list() { + let list = BoltList::from(vec![ + BoltType::Integer(neo4rs::BoltInteger::new(1)), + BoltType::String("two".into()), + BoltType::Boolean(neo4rs::BoltBoolean::new(false)), + ]); + assert_eq!( + bolt_to_cypher_literal(&BoltType::List(list)), + "[1, 'two', false]" + ); + } + + #[test] + fn literal_list_empty() { + let list = BoltList::from(Vec::::new()); + assert_eq!(bolt_to_cypher_literal(&BoltType::List(list)), "[]"); + } + + #[test] + fn literal_map_single_entry() { + let mut map = BoltMap::default(); + map.put( + "key".into(), + BoltType::Integer(neo4rs::BoltInteger::new(99)), + ); + assert_eq!(bolt_to_cypher_literal(&BoltType::Map(map)), "{key: 99}"); + } + + // ── populate_cypher ───────────────────────────────────────────── + + #[test] + fn populate_basic_substitution() { + let q = query("MATCH (u:User {id: $id}) RETURN u").param("id", "abc123"); + assert_eq!( + q.to_cypher_populated(), + "MATCH (u:User {id: 'abc123'}) RETURN u" + ); + } + + #[test] + fn populate_multiple_params() { + let q = query("MATCH (u:User {id: $id}) SET u.name = $name") + .param("id", "abc") + .param("name", "Alice"); + let result = q.to_cypher_populated(); + assert!(result.contains("'abc'")); + assert!(result.contains("'Alice'")); + assert!(!result.contains("$id")); + assert!(!result.contains("$name")); + } + + #[test] + fn populate_no_params() { + let q = query("RETURN 1"); + assert_eq!(q.to_cypher_populated(), "RETURN 1"); + } + + #[test] + fn populate_prefix_overlap_longer_replaced_first() { + // $user_id should not be partially matched by $user + let q = query("MATCH (u {id: $user_id, name: $user})") + .param("user_id", "id123") + .param("user", "Alice"); + let result = q.to_cypher_populated(); + assert_eq!(result, "MATCH (u {id: 'id123', name: 'Alice'})"); + } + + #[test] + fn populate_integer_and_bool_params() { + let q = query("MATCH (p) WHERE p.age > $age AND p.active = $active RETURN p") + .param("age", 18_i64) + .param("active", true); + let result = q.to_cypher_populated(); + assert!(result.contains("> 18")); + assert!(result.contains("= true")); + } + + #[test] + fn populate_param_not_in_cypher_is_ignored() { + let q = query("RETURN 1").param("unused", "value"); + assert_eq!(q.to_cypher_populated(), "RETURN 1"); + } + + #[test] + fn populate_special_chars_in_value() { + let q = query("SET u.bio = $bio").param("bio", "it's a\nnew \"line\""); + let result = q.to_cypher_populated(); + assert_eq!(result, "SET u.bio = 'it\\'s a\\nnew \"line\"'"); + } + + // ── From for neo4rs::Query ─────────────────────────────── + + #[test] + fn into_neo4rs_query_preserves_cypher() { + let q = query("RETURN $x").param("x", 42_i64); + let nq: neo4rs::Query = q.into(); + // neo4rs::Query doesn't expose cypher publicly, but if the + // conversion compiles and doesn't panic, the basic contract holds. + let _ = nq; + } + + // ── Query builder ─────────────────────────────────────────────── + + #[test] + fn query_builder_params_batch() { + let q = query("MATCH (u {id: $id, name: $name})").params(vec![ + (BoltString::from("id"), BoltType::String("abc".into())), + (BoltString::from("name"), BoltType::String("Bob".into())), + ]); + let result = q.to_cypher_populated(); + assert!(result.contains("'abc'")); + assert!(result.contains("'Bob'")); + } +} diff --git a/nexus-common/src/db/graph/setup.rs b/nexus-common/src/db/graph/setup.rs index 4633eb92e..090a9cbbd 100644 --- a/nexus-common/src/db/graph/setup.rs +++ b/nexus-common/src/db/graph/setup.rs @@ -1,5 +1,6 @@ -use crate::db::{get_neo4j_graph, graph::error::GraphResult, GraphError}; -use neo4rs::query; +use crate::db::get_neo4j_graph; +use crate::db::graph::error::{GraphError, GraphResult}; +use crate::db::graph::Query; use tracing::info; /// Ensure the Neo4j graph has the required constraints and indexes @@ -28,23 +29,13 @@ pub async fn setup_graph() -> GraphResult<()> { let graph = get_neo4j_graph()?; - // Start an explicit transaction - let txn = graph - .start_txn() - .await - .map_err(|e| GraphError::Generic(format!("Failed to start transaction: {e}")))?; - for &ddl in queries { - graph.run(query(ddl)).await.map_err(|e| { + graph.run(Query::new("setup_ddl", ddl)).await.map_err(|e| { GraphError::Generic(format!( "Failed to apply graph constraint/index '{ddl}': {e}" )) })?; } - // Commit everything in one go - txn.commit() - .await - .map_err(|e| GraphError::Generic(format!("Failed to commit transaction: {e}")))?; info!("Neo4j graph constraints and indexes have been applied successfully"); diff --git a/nexus-common/src/db/graph/traced.rs b/nexus-common/src/db/graph/traced.rs new file mode 100644 index 000000000..e5aeccc91 --- /dev/null +++ b/nexus-common/src/db/graph/traced.rs @@ -0,0 +1,363 @@ +use async_trait::async_trait; +use futures::stream::BoxStream; +use futures::{Stream, StreamExt, TryStreamExt}; +use neo4rs::Row; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use tracing::warn; + +use super::query::Query; +use crate::db::config::DEFAULT_SLOW_QUERY_THRESHOLD_MS; + +/// Abstraction over graph database operations. +/// Callers depend on this trait, not the concrete implementations. +#[async_trait] +pub trait GraphOps: Send + Sync { + /// Execute query, return boxed row stream. + async fn execute( + &self, + query: Query, + ) -> neo4rs::Result>>; + + /// Fire-and-forget query execution. + async fn run(&self, query: Query) -> neo4rs::Result<()>; +} + +/// Thin wrapper around `neo4rs::Graph` implementing `GraphOps` without tracing. +#[derive(Clone)] +pub struct Graph { + inner: neo4rs::Graph, +} + +impl Graph { + pub fn new(graph: neo4rs::Graph) -> Self { + Self { inner: graph } + } +} + +#[async_trait] +impl GraphOps for Graph { + async fn execute( + &self, + query: Query, + ) -> neo4rs::Result>> { + let stream = self + .inner + .execute(query.into()) + .await? + .into_stream() + .map_err(Into::into) + .boxed(); + Ok(stream) + } + + async fn run(&self, query: Query) -> neo4rs::Result<()> { + self.inner.run(query.into()).await + } +} + +/// A stream wrapper that measures total query time and logs slow queries when dropped. +struct TracedStream { + inner: BoxStream<'static, Result>, + label: Option<&'static str>, + /// Populated cypher text for debug logging (only set when `slow_query_logging_include_cypher` is enabled). + cypher: Option, + /// Pool-acquire + Bolt RUN round-trip (query planning & start of execution). + execute_duration: Duration, + /// Wall-clock time from stream creation to drop (row fetching & consumption). + stream_start: Instant, + row_count: usize, + threshold: Duration, +} + +impl Stream for TracedStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let result = Pin::new(&mut self.inner).poll_next(cx); + if let Poll::Ready(Some(Ok(_))) = &result { + self.row_count += 1; + } + result + } +} + +impl Drop for TracedStream { + fn drop(&mut self) { + if let Some(label) = &self.label { + let fetch_duration = self.stream_start.elapsed(); + let total = self.execute_duration + fetch_duration; + if total > self.threshold { + warn!( + total_ms = total.as_millis(), + execute_ms = self.execute_duration.as_millis(), + fetch_ms = fetch_duration.as_millis(), + rows = self.row_count, + query = %label, + cypher = self.cypher.as_deref().unwrap_or(""), + "Slow Neo4j query" + ); + } + } + } +} + +/// Decorator around a [`GraphOps`] implementation that logs slow queries. +#[derive(Clone)] +pub struct TracedGraph { + inner: G, + slow_query_threshold: Duration, + log_cypher: bool, +} + +impl TracedGraph { + pub fn new(graph: G) -> Self { + Self { + inner: graph, + slow_query_threshold: Duration::from_millis(DEFAULT_SLOW_QUERY_THRESHOLD_MS), + log_cypher: false, + } + } + + pub fn with_slow_query_threshold(mut self, threshold: Duration) -> Self { + self.slow_query_threshold = threshold; + self + } + + pub fn with_log_cypher(mut self, enabled: bool) -> Self { + self.log_cypher = enabled; + self + } +} + +#[async_trait] +impl GraphOps for TracedGraph { + async fn execute( + &self, + query: Query, + ) -> neo4rs::Result>> { + let label = query.label(); + let cypher = if self.log_cypher { + Some(query.to_cypher_populated()) + } else { + None + }; + let start = Instant::now(); + let stream = self.inner.execute(query).await?; + let execute_duration = start.elapsed(); + + let traced = TracedStream { + inner: stream, + label, + cypher, + execute_duration, + stream_start: Instant::now(), // Timestamp after execute(), so it only tracks the fetch phase + row_count: 0, + threshold: self.slow_query_threshold, + }; + Ok(traced.boxed()) + } + + async fn run(&self, query: Query) -> neo4rs::Result<()> { + let label = query.label(); + let cypher = if self.log_cypher { + Some(query.to_cypher_populated()) + } else { + None + }; + let start = Instant::now(); + let result = self.inner.run(query).await; + let elapsed = start.elapsed(); + + if let Some(label) = &label { + if elapsed > self.slow_query_threshold { + warn!(elapsed_ms = elapsed.as_millis(), query = %label, cypher = cypher.as_deref().unwrap_or(""), "Slow Neo4j query"); + } + } + + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::stream; + use neo4rs::{BoltList, BoltType}; + use tracing_test::traced_test; + + /// Create a dummy `Row` with a single string field. + fn dummy_row() -> Row { + let fields = BoltList::from(vec![BoltType::String("n".into())]); + let data = BoltList::from(vec![BoltType::String("value".into())]); + Row::new(fields, data) + } + + fn make_traced_stream( + inner: BoxStream<'static, Result>, + label: Option<&'static str>, + threshold: Duration, + ) -> TracedStream { + TracedStream { + inner, + label, + cypher: None, + execute_duration: Duration::ZERO, + stream_start: Instant::now(), + row_count: 0, + threshold, + } + } + + // ── TracedStream row counting ────────────────────────────────── + + #[tokio::test] + async fn counts_rows_from_inner_stream() { + let rows = vec![Ok(dummy_row()), Ok(dummy_row()), Ok(dummy_row())]; + let inner = stream::iter(rows).boxed(); + let mut ts = make_traced_stream(inner, Some("test"), Duration::from_secs(100)); + + while ts.next().await.is_some() {} + assert_eq!(ts.row_count, 3); + } + + #[tokio::test] + async fn empty_stream_yields_none_and_zero_rows() { + let inner = stream::empty().boxed(); + let mut ts = make_traced_stream(inner, Some("test"), Duration::from_secs(100)); + assert!(ts.next().await.is_none()); + assert_eq!(ts.row_count, 0); + } + + // ── Slow-path doesn't abort the stream ───────────────────────── + + #[tokio::test] + async fn slow_query_still_yields_all_rows() { + // Threshold is 0ms — every query is "slow", but all rows must still be returned. + let rows = vec![Ok(dummy_row()), Ok(dummy_row())]; + let inner = stream::iter(rows).boxed(); + let mut ts = make_traced_stream(inner, Some("slow_q"), Duration::ZERO); + + let mut collected = Vec::new(); + while let Some(item) = ts.next().await { + collected.push(item.expect("row should be Ok")); + } + assert_eq!(collected.len(), 2); + assert_eq!(ts.row_count, 2); + } + + // ── warn! emission ───────────────────────────────────────────── + + #[tokio::test] + #[traced_test] + async fn emits_warning_when_threshold_exceeded() { + let inner = stream::iter(vec![Ok(dummy_row())]).boxed(); + // threshold = 0 → always slow + let mut ts = make_traced_stream(inner, Some("slow_label"), Duration::ZERO); + while ts.next().await.is_some() {} + drop(ts); + + assert!(logs_contain("Slow Neo4j query")); + assert!(logs_contain("slow_label")); + } + + #[tokio::test] + #[traced_test] + async fn no_warning_when_under_threshold() { + let inner = stream::empty().boxed(); + let ts = make_traced_stream(inner, Some("fast_q"), Duration::from_secs(600)); + drop(ts); + + assert!(!logs_contain("Slow Neo4j query")); + } + + #[tokio::test] + #[traced_test] + async fn no_warning_when_label_is_none() { + let inner = stream::empty().boxed(); + // threshold = 0 but no label → drop skips logging entirely + let ts = make_traced_stream(inner, None, Duration::ZERO); + drop(ts); + + assert!(!logs_contain("Slow Neo4j query")); + } + + #[tokio::test] + #[traced_test] + async fn warning_includes_cypher_when_set() { + let ts = TracedStream { + inner: stream::empty().boxed(), + label: Some("cypher_q"), + cypher: Some("MATCH (n) RETURN n".into()), + execute_duration: Duration::ZERO, + stream_start: Instant::now(), + row_count: 0, + threshold: Duration::ZERO, + }; + drop(ts); + + assert!(logs_contain("MATCH (n) RETURN n")); + } + + // ── TracedGraph ──────────────────────────────────────────────── + + /// Mock `GraphOps` for testing `TracedGraph` without a real Neo4j connection. + #[derive(Clone)] + struct MockGraph { + row_count: usize, + } + + #[async_trait] + impl GraphOps for MockGraph { + async fn execute( + &self, + _query: Query, + ) -> neo4rs::Result>> { + let rows: Vec> = + (0..self.row_count).map(|_| Ok(dummy_row())).collect(); + Ok(stream::iter(rows).boxed()) + } + + async fn run(&self, _query: Query) -> neo4rs::Result<()> { + Ok(()) + } + } + + fn test_query() -> Query { + Query::new("test_label", "MATCH (n) RETURN n") + } + + #[tokio::test] + async fn traced_graph_execute_returns_all_rows() { + let tg = TracedGraph::new(MockGraph { row_count: 3 }); + let mut stream = tg.execute(test_query()).await.unwrap(); + + let mut count = 0; + while stream.next().await.is_some() { + count += 1; + } + assert_eq!(count, 3); + } + + #[tokio::test] + #[traced_test] + async fn traced_graph_run_warns_on_slow_query() { + let tg = + TracedGraph::new(MockGraph { row_count: 0 }).with_slow_query_threshold(Duration::ZERO); + tg.run(test_query()).await.unwrap(); + + assert!(logs_contain("Slow Neo4j query")); + assert!(logs_contain("test_label")); + } + + #[tokio::test] + #[traced_test] + async fn traced_graph_run_no_warning_under_threshold() { + let tg = TracedGraph::new(MockGraph { row_count: 0 }) + .with_slow_query_threshold(Duration::from_secs(600)); + tg.run(test_query()).await.unwrap(); + + assert!(!logs_contain("Slow Neo4j query")); + } +} diff --git a/nexus-common/src/db/mod.rs b/nexus-common/src/db/mod.rs index f6a7e2bad..604e14961 100644 --- a/nexus-common/src/db/mod.rs +++ b/nexus-common/src/db/mod.rs @@ -13,4 +13,5 @@ pub use graph::error::{GraphError, GraphResult}; pub use graph::exec::*; pub use graph::queries; pub use graph::setup; +pub use graph::GraphOps; pub use kv::RedisOps; diff --git a/nexus-common/src/db/reindex.rs b/nexus-common/src/db/reindex.rs index 3d798b370..9372d9868 100644 --- a/nexus-common/src/db/reindex.rs +++ b/nexus-common/src/db/reindex.rs @@ -1,4 +1,5 @@ use crate::db::graph::exec::fetch_all_rows_from_graph; +use crate::db::graph::Query; use crate::models::follow::{Followers, Following, UserFollows}; use crate::models::post::search::PostsByTagSearch; use crate::models::post::Bookmark; @@ -14,7 +15,6 @@ use crate::{ models::post::{PostCounts, PostDetails, PostRelationships}, models::user::UserCounts, }; -use neo4rs::query; use tokio::task::JoinSet; use tracing::info; @@ -100,7 +100,7 @@ pub async fn reindex_post(author_id: &str, post_id: &str) -> Result<(), DynError } pub async fn get_all_user_ids() -> Result, DynError> { - let query = query("MATCH (u:User) RETURN u.id AS id"); + let query = Query::new("get_all_user_ids", "MATCH (u:User) RETURN u.id AS id"); let rows = fetch_all_rows_from_graph(query).await?; let mut user_ids = Vec::new(); @@ -114,8 +114,10 @@ pub async fn get_all_user_ids() -> Result, DynError> { } async fn get_all_post_ids() -> Result, DynError> { - let query = - query("MATCH (u:User)-[:AUTHORED]->(p:Post) RETURN u.id AS author_id, p.id AS post_id"); + let query = Query::new( + "get_all_post_ids", + "MATCH (u:User)-[:AUTHORED]->(p:Post) RETURN u.id AS author_id, p.id AS post_id", + ); let rows = fetch_all_rows_from_graph(query).await?; let mut post_ids = Vec::new(); diff --git a/nexus-common/src/models/file/details.rs b/nexus-common/src/models/file/details.rs index 9dd5bf036..e884edb5c 100644 --- a/nexus-common/src/models/file/details.rs +++ b/nexus-common/src/models/file/details.rs @@ -1,4 +1,4 @@ -use crate::db::graph::GraphResult; +use crate::db::graph::{GraphResult, Query}; use crate::db::kv::RedisResult; use crate::db::{exec_single_row, queries, RedisOps}; use crate::media::FileVariant; @@ -6,7 +6,6 @@ use crate::models::error::ModelResult; use crate::models::traits::Collection; use async_trait::async_trait; use chrono::Utc; -use neo4rs::Query; use pubky_app_specs::{ParsedUri, PubkyAppFile, Resource}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; diff --git a/nexus-common/src/models/follow/followers.rs b/nexus-common/src/models/follow/followers.rs index d31a4a20f..aa7f10830 100644 --- a/nexus-common/src/models/follow/followers.rs +++ b/nexus-common/src/models/follow/followers.rs @@ -1,6 +1,6 @@ +use crate::db::graph::Query; use crate::db::{queries, RedisOps}; use async_trait::async_trait; -use neo4rs::Query; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; diff --git a/nexus-common/src/models/follow/following.rs b/nexus-common/src/models/follow/following.rs index 5e3f5ad2a..f2e24cfb9 100644 --- a/nexus-common/src/models/follow/following.rs +++ b/nexus-common/src/models/follow/following.rs @@ -1,6 +1,6 @@ +use crate::db::graph::Query; use crate::db::{queries, RedisOps}; use async_trait::async_trait; -use neo4rs::Query; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; diff --git a/nexus-common/src/models/follow/traits.rs b/nexus-common/src/models/follow/traits.rs index f4ca17b5a..e91b915c3 100644 --- a/nexus-common/src/models/follow/traits.rs +++ b/nexus-common/src/models/follow/traits.rs @@ -1,3 +1,4 @@ +use crate::db::graph::Query; use crate::db::kv::RedisResult; use crate::db::{ execute_graph_operation, fetch_row_from_graph, queries, GraphResult, OperationOutcome, RedisOps, @@ -5,7 +6,6 @@ use crate::db::{ use crate::models::error::ModelResult; use async_trait::async_trait; use chrono::Utc; -use neo4rs::Query; #[async_trait] pub trait UserFollows: Sized + RedisOps + AsRef<[String]> + Default { diff --git a/nexus-common/src/models/notification/mod.rs b/nexus-common/src/models/notification/mod.rs index ebe38d187..859d140b4 100644 --- a/nexus-common/src/models/notification/mod.rs +++ b/nexus-common/src/models/notification/mod.rs @@ -79,7 +79,7 @@ pub enum NotificationBody { }, } -type QueryFunction = fn(&str, &str) -> neo4rs::Query; +type QueryFunction = fn(&str, &str) -> crate::db::graph::Query; type ExtractFunction = Box (String, String) + Send>; impl Default for NotificationBody { diff --git a/nexus-common/src/models/post/search.rs b/nexus-common/src/models/post/search.rs index 12c75036d..50cccc89e 100644 --- a/nexus-common/src/models/post/search.rs +++ b/nexus-common/src/models/post/search.rs @@ -1,3 +1,4 @@ +use crate::db::graph::Query; use crate::db::kv::{RedisResult, ScoreAction, SortOrder}; use crate::db::queries::get::{global_tags_by_post, global_tags_by_post_engagement}; use crate::db::{fetch_all_rows_from_graph, RedisOps}; @@ -6,7 +7,6 @@ use crate::models::post::PostDetails; use crate::models::tag::post::TagPost; use crate::models::tag::traits::TaggersCollection; use crate::types::{Pagination, StreamSorting}; -use neo4rs::Query; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; diff --git a/nexus-common/src/models/post/stream.rs b/nexus-common/src/models/post/stream.rs index d1d391f93..fb9bd8790 100644 --- a/nexus-common/src/models/post/stream.rs +++ b/nexus-common/src/models/post/stream.rs @@ -8,6 +8,7 @@ use crate::models::{ post::search::PostsByTagSearch, }; use crate::types::{Pagination, StreamSorting}; +use futures::TryStreamExt; use pubky_app_specs::PubkyAppPostKind; use serde::{Deserialize, Serialize}; use tokio::task::spawn; @@ -271,7 +272,7 @@ impl PostStream { // Track the last post's indexed_at value let mut last_post_indexed_at: Option = None; - while let Some(row) = result.next().await? { + while let Some(row) = result.try_next().await? { let author_id: String = row.get("author_id")?; let post_id: String = row.get("post_id")?; let indexed_at: i64 = row.get("indexed_at")?; diff --git a/nexus-common/src/models/tag/traits/collection.rs b/nexus-common/src/models/tag/traits/collection.rs index caadacddc..7d1b75612 100644 --- a/nexus-common/src/models/tag/traits/collection.rs +++ b/nexus-common/src/models/tag/traits/collection.rs @@ -1,10 +1,10 @@ +use crate::db::graph::Query; use crate::db::kv::{RedisResult, ScoreAction, SortOrder}; use crate::db::{ execute_graph_operation, fetch_row_from_graph, queries, GraphResult, OperationOutcome, RedisOps, }; use crate::models::error::ModelResult; use async_trait::async_trait; -use neo4rs::Query; use tracing::error; use crate::models::tag::{post::POST_TAGS_KEY_PARTS, user::USER_TAGS_KEY_PARTS}; diff --git a/nexus-common/src/models/traits.rs b/nexus-common/src/models/traits.rs index 819129209..92ba725c3 100644 --- a/nexus-common/src/models/traits.rs +++ b/nexus-common/src/models/traits.rs @@ -1,9 +1,9 @@ +use crate::db::graph::Query; use crate::db::kv::RedisResult; use crate::db::{exec_single_row, fetch_all_rows_from_graph, GraphResult, RedisOps}; use crate::models::error::ModelResult; use async_trait::async_trait; use core::fmt; -use neo4rs::Query; use std::fmt::Debug; pub trait CollectionId { diff --git a/nexus-common/src/models/user/details.rs b/nexus-common/src/models/user/details.rs index 59d47b48f..43a02a16b 100644 --- a/nexus-common/src/models/user/details.rs +++ b/nexus-common/src/models/user/details.rs @@ -1,11 +1,11 @@ use super::UserSearch; +use crate::db::graph::Query; use crate::db::kv::RedisResult; use crate::db::{exec_single_row, queries, GraphResult, RedisOps}; use crate::models::error::ModelResult; use crate::models::traits::Collection; use async_trait::async_trait; use chrono::Utc; -use neo4rs::Query; use pubky_app_specs::{PubkyAppUser, PubkyAppUserLink, PubkyId}; use serde::{Deserialize, Deserializer, Serialize}; use serde_json; diff --git a/nexus-watcher/tests/event_processor/follows/utils.rs b/nexus-watcher/tests/event_processor/follows/utils.rs index a4df8980a..a4ea7ed54 100644 --- a/nexus-watcher/tests/event_processor/follows/utils.rs +++ b/nexus-watcher/tests/event_processor/follows/utils.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use neo4rs::{query, Query}; use nexus_common::db::fetch_key_from_graph; +use nexus_common::db::graph::Query; pub async fn find_follow_relationship(follower: &str, followee: &str) -> Result { let query = user_following_query(follower, followee); @@ -14,7 +14,10 @@ pub async fn find_follow_relationship(follower: &str, followee: &str) -> Result< } fn user_following_query(follower: &str, followee: &str) -> Query { - query(" RETURN EXISTS((:User {id: $follower})-[:FOLLOWS]->(:User {id: $followee})) AS exist") - .param("followee", followee) - .param("follower", follower) + Query::new( + "user_following_query", + "RETURN EXISTS((:User {id: $follower})-[:FOLLOWS]->(:User {id: $followee})) AS exist", + ) + .param("followee", followee) + .param("follower", follower) } diff --git a/nexus-watcher/tests/event_processor/mentions/utils.rs b/nexus-watcher/tests/event_processor/mentions/utils.rs index 20cef9407..205c72650 100644 --- a/nexus-watcher/tests/event_processor/mentions/utils.rs +++ b/nexus-watcher/tests/event_processor/mentions/utils.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use neo4rs::{query, Query}; use nexus_common::db::fetch_key_from_graph; +use nexus_common::db::graph::Query; pub async fn find_post_mentions(follower: &str, followee: &str) -> Result> { let query = post_mention_query(follower, followee); @@ -13,7 +13,8 @@ pub async fn find_post_mentions(follower: &str, followee: &str) -> Result Query { - query( + Query::new( + "post_mention_query", " MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) OPTIONAL MATCH (p)-[:MENTIONED]->(mentioned_user:User) diff --git a/nexus-watcher/tests/event_processor/posts/utils.rs b/nexus-watcher/tests/event_processor/posts/utils.rs index 2657a8a37..de75f8f94 100644 --- a/nexus-watcher/tests/event_processor/posts/utils.rs +++ b/nexus-watcher/tests/event_processor/posts/utils.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use neo4rs::{query, Query}; +use nexus_common::db::graph::Query; use nexus_common::{ db::{fetch_key_from_graph, RedisOps}, models::post::{ @@ -119,7 +119,8 @@ pub async fn find_repost_relationship_parent_uri(user_id: &str, post_id: &str) - } pub fn post_reply_relationships(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "post_reply_relationships", "MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) OPTIONAL MATCH (p)-[:REPLIED]->(reply:Post)<-[:AUTHORED]-(reply_author:User) RETURN COLLECT([ @@ -131,7 +132,8 @@ pub fn post_reply_relationships(author_id: &str, post_id: &str) -> Query { } pub fn post_repost_relationships(author_id: &str, post_id: &str) -> Query { - query( + Query::new( + "post_repost_relationships", "MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id}) OPTIONAL MATCH (p)-[:REPOSTED]->(repost:Post)<-[:AUTHORED]-(repost_author:User) RETURN collect([ @@ -144,7 +146,8 @@ pub fn post_repost_relationships(author_id: &str, post_id: &str) -> Query { // Retrieve a post by id pub fn get_post_details_by_id(user_id: &str, post_id: &str) -> Query { - query( + Query::new( + "get_post_details_by_id", " MATCH (user:User {id: $user_id})-[:AUTHORED]->(post:Post {id: $post_id}) RETURN { diff --git a/nexus-watcher/tests/event_processor/tags/utils.rs b/nexus-watcher/tests/event_processor/tags/utils.rs index b0b00eb12..3f381909a 100644 --- a/nexus-watcher/tests/event_processor/tags/utils.rs +++ b/nexus-watcher/tests/event_processor/tags/utils.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use neo4rs::{query, Query}; +use nexus_common::db::graph::Query; use nexus_common::{ db::{fetch_key_from_graph, RedisOps}, models::{ @@ -56,7 +56,8 @@ pub async fn check_member_post_tag_global_timeline( // Retrieve post related tag fn post_tag_query(user_id: &str, post_id: &str, tag_name: &str) -> Query { - query( + Query::new( + "post_tag_query", " MATCH (u:User {id: $user_id})-[:AUTHORED]->(p:Post {id: $post_id})<-[t:TAGGED {label: $tag_name}]-(tagger:User) WITH COUNT(tagger) as count, COLLECT(tagger.id) as list, t.label as label @@ -74,7 +75,8 @@ fn post_tag_query(user_id: &str, post_id: &str, tag_name: &str) -> Query { // Retrieve post related tag fn user_tag_query(tagged_user_id: &str, tag_name: &str) -> Query { - query( + Query::new( + "user_tag_query", " MATCH (u:User {id: $tagged_user_id})<-[t:TAGGED {label: $tag_name}]-(tagger:User) WITH COUNT(tagger) as count, COLLECT(tagger.id) as list, t.label as label diff --git a/nexus-webapi/src/mock.rs b/nexus-webapi/src/mock.rs index 78f35660d..e1bc1d8c3 100644 --- a/nexus-webapi/src/mock.rs +++ b/nexus-webapi/src/mock.rs @@ -1,8 +1,7 @@ use crate::{api_context::ApiContextBuilder, NexusApiBuilder}; use clap::ValueEnum; -use neo4rs::query; use nexus_common::{ - db::{get_neo4j_graph, get_redis_conn, reindex}, + db::{get_neo4j_graph, get_redis_conn, graph::Query, reindex}, ApiConfig, }; use std::process::Stdio; @@ -19,9 +18,12 @@ pub enum MockType { pub struct MockDb {} impl MockDb { - pub async fn clear_database() { + /// Initialize the database stack for CLI db commands (no slow-query logging). + async fn init_stack() { + let mut api_config = ApiConfig::default(); + api_config.stack.db.neo4j.slow_query_logging_enabled = false; let api_context = ApiContextBuilder::from_default_config_dir() - .api_config(ApiConfig::default()) + .api_config(api_config) .try_build() .await .expect("Failed to create ApiContext"); @@ -29,6 +31,10 @@ impl MockDb { .init_stack() .await .expect("Failed to initialize stack"); + } + + pub async fn clear_database() { + Self::init_stack().await; Self::drop_cache().await; Self::drop_graph().await; @@ -36,15 +42,7 @@ impl MockDb { } pub async fn run(mock_type: Option) { - let api_context = ApiContextBuilder::from_default_config_dir() - .api_config(ApiConfig::default()) - .try_build() - .await - .expect("Failed to create ApiContext"); - NexusApiBuilder(api_context) - .init_stack() - .await - .expect("Failed to initialize stack"); + Self::init_stack().await; match mock_type { Some(MockType::Redis) => Self::sync_redis().await, @@ -58,7 +56,7 @@ impl MockDb { let graph = get_neo4j_graph().expect("Failed to get Neo4j graph connection"); // drop and run the queries again - let drop_all_query = query("MATCH (n) DETACH DELETE n;"); + let drop_all_query = Query::new("drop_graph", "MATCH (n) DETACH DELETE n;"); graph .run(drop_all_query) .await diff --git a/nexusd/Cargo.toml b/nexusd/Cargo.toml index b1b590a94..82dfb6373 100644 --- a/nexusd/Cargo.toml +++ b/nexusd/Cargo.toml @@ -12,6 +12,7 @@ async-trait = { workspace = true } dirs = "6.0.0" chrono = { workspace = true } clap = { workspace = true, features = ["derive"] } +futures = { workspace = true } neo4rs = { workspace = true } nexus-webapi = { version = "0.4.1", path = "../nexus-webapi" } nexus-common = { version = "0.4.1", path = "../nexus-common" } diff --git a/nexusd/src/migrations/default.config.toml b/nexusd/src/migrations/default.config.toml index 07119850c..523cadd78 100644 --- a/nexusd/src/migrations/default.config.toml +++ b/nexusd/src/migrations/default.config.toml @@ -19,4 +19,10 @@ redis = "redis://localhost:6379" uri = "bolt://localhost:7687" # Not needed in the Community Edition #user = "neo4j" -password = "12345678" \ No newline at end of file +password = "12345678" +# Queries taking longer than this (ms) will be logged as warnings +slow_query_logging_threshold_ms = 100 +# Disable slow query logging for CLI migration commands +slow_query_logging_enabled = false +# Include the Cypher query text in slow query log entries +#slow_query_logging_include_cypher = false diff --git a/nexusd/src/migrations/manager.rs b/nexusd/src/migrations/manager.rs index 30a88afc2..aa6f4705b 100644 --- a/nexusd/src/migrations/manager.rs +++ b/nexusd/src/migrations/manager.rs @@ -1,9 +1,13 @@ use async_trait::async_trait; use chrono::Utc; -use neo4rs::{Graph, Query}; -use nexus_common::{db::get_neo4j_graph, types::DynError}; +use futures::TryStreamExt; +use nexus_common::{ + db::{get_neo4j_graph, graph::Query, GraphOps}, + types::DynError, +}; use serde::{Deserialize, Serialize}; use std::any::Any; +use std::sync::Arc; use tracing::info; use crate::migrations::utils::{self, generate_template}; @@ -86,18 +90,18 @@ pub struct MigrationNode { const MIGRATION_PATH: &str = "nexusd/src/migrations/migrations_list/"; pub struct MigrationManager { - graph: Graph, + graph: Arc, migrations: Vec>, } impl Default for MigrationManager { fn default() -> Self { - let graph_connection = match get_neo4j_graph() { - Ok(connection) => connection, + let graph = match get_neo4j_graph() { + Ok(graph) => graph, Err(e) => panic!("Could not initialise migration manager: {e:?}"), }; Self { - graph: graph_connection, + graph, migrations: Vec::new(), } } @@ -215,10 +219,13 @@ impl MigrationManager { } async fn get_migrations(&self) -> Result, DynError> { - let query = Query::new("MATCH (m:Migration) RETURN COLLECT(m) as migrations".to_string()); + let query = Query::new( + "get_migrations", + "MATCH (m:Migration) RETURN COLLECT(m) as migrations", + ); let mut result = self.graph.execute(query).await.map_err(|e| e.to_string())?; - match result.next().await { + match result.try_next().await { Ok(row) => match row { Some(row) => match row.get::>("migrations") { Ok(migrations) => Ok(migrations), @@ -236,8 +243,8 @@ impl MigrationManager { false => MigrationPhase::Backfill, }; let query = Query::new( - "MERGE (m:Migration {id: $id, phase: $phase, created_at: timestamp(), updated_at: 0})" - .to_string(), + "store_migration", + "MERGE (m:Migration {id: $id, phase: $phase, created_at: timestamp(), updated_at: 0})", ) .param("id", id) .param("phase", initial_phase.to_string()); @@ -252,8 +259,8 @@ impl MigrationManager { phase: &MigrationPhase, ) -> Result<(), DynError> { let query = Query::new( - "MERGE (m:Migration {id: $id}) SET m.phase = $phase, m.updated_at = timestamp()" - .to_string(), + "update_migration_phase", + "MERGE (m:Migration {id: $id}) SET m.phase = $phase, m.updated_at = timestamp()", ) .param("id", id) .param("phase", phase.to_string()); diff --git a/nexusd/src/migrations/migrations_list/remove_muted_1771718400.rs b/nexusd/src/migrations/migrations_list/remove_muted_1771718400.rs index 7e03578d4..cb84b66d8 100644 --- a/nexusd/src/migrations/migrations_list/remove_muted_1771718400.rs +++ b/nexusd/src/migrations/migrations_list/remove_muted_1771718400.rs @@ -1,7 +1,8 @@ use async_trait::async_trait; +use futures::StreamExt; use crate::migrations::{manager::Migration, utils::delete_keys_by_pattern}; -use nexus_common::{db::get_neo4j_graph, types::DynError}; +use nexus_common::{db::get_neo4j_graph, db::graph::Query, types::DynError}; use tracing::info; pub struct RemoveMuted1771718400; @@ -26,13 +27,15 @@ impl Migration for RemoveMuted1771718400 { let mut total_deleted: i64 = 0; loop { - let query = neo4rs::query( + let query = Query::new( + "remove_muted_batch", "MATCH ()-[r:MUTED]->() WITH r LIMIT 10000 DELETE r RETURN count(r) AS deleted", ); let mut result = graph.execute(query).await?; - let deleted: i64 = match result.next().await? { - Some(row) => row.get("deleted").unwrap_or(0), + let deleted: i64 = match result.next().await { + Some(Ok(row)) => row.get::("deleted").unwrap_or(0), + Some(Err(e)) => return Err(e.into()), None => 0, };