Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
857255c
slow graph query logger
aintnostressin Feb 25, 2026
9b42ae2
interpolate only if logged
aintnostressin Feb 26, 2026
de6c027
add query labels
aintnostressin Feb 26, 2026
47b0a0f
measure row consume time
aintnostressin Feb 26, 2026
2e4bcb1
add comment about execute measure
aintnostressin Feb 26, 2026
67ddad0
merge time
aintnostressin Feb 26, 2026
529f7af
remove txns
aintnostressin Feb 27, 2026
518efc9
Merge branch 'main' into feat/slow-graph-query-logger
aintnostressin Mar 1, 2026
1deaa59
remove redundant to_string()
aintnostressin Mar 1, 2026
b7278ce
consistent import path
aintnostressin Mar 1, 2026
fb5d900
remove redundant map_err
aintnostressin Mar 1, 2026
05c281c
toml formatting
aintnostressin Mar 1, 2026
96e9782
add config comments
aintnostressin Mar 1, 2026
b56c2f9
fix
aintnostressin Mar 2, 2026
fbba68d
disable logging in migrations and mock
aintnostressin Mar 2, 2026
90bbb46
reduce visibility
aintnostressin Mar 2, 2026
3a49318
static str instead of String
aintnostressin Mar 2, 2026
4a50103
query module private
aintnostressin Mar 2, 2026
69e1cb5
measure wall clock time
aintnostressin Mar 2, 2026
aa71221
log slow queries cypher
aintnostressin Mar 3, 2026
baff756
Merge branch 'main' into feat/slow-graph-query-logger
aintnostressin Mar 3, 2026
82caca3
fix clippy PI warning
aintnostressin Mar 3, 2026
d920712
fmt
aintnostressin Mar 4, 2026
dad6341
extract query label and cypher to vars
aintnostressin Mar 4, 2026
ff0ae39
GraphExec -> GraphOps
aintnostressin Mar 4, 2026
7ed22b6
add comment
aintnostressin Mar 4, 2026
5e622c7
dynamic query post stream label
aintnostressin Mar 4, 2026
b405147
reused start
aintnostressin Mar 4, 2026
e7d36b0
rename config var names
aintnostressin Mar 4, 2026
390f12d
missing default config
aintnostressin Mar 4, 2026
a45a4d9
query fn test only
aintnostressin Mar 4, 2026
a538b94
consistent cypher logging
aintnostressin Mar 4, 2026
d2c12d1
revert label and cypher vars
aintnostressin Mar 4, 2026
8684e08
add TracedGraph unit tests
aintnostressin Mar 5, 2026
0b2272b
Merge branch 'main' into feat/slow-graph-query-logger
aintnostressin Mar 5, 2026
a0c798e
chore: fix query.param() indentation
ok300 Mar 5, 2026
9dae547
Update nexus-common/src/db/graph/traced.rs
aintnostressin Mar 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ thiserror = "2.0.17"
tokio = { version = "1.49.0", features = ["full"] }
tokio-shared-rt = "0.1"
tracing = "0.1.44"
tracing-test = "0.2"
futures = "0.3"
8 changes: 7 additions & 1 deletion examples/api/api-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ redis = "redis://127.0.0.1:6379"
uri = "bolt://localhost:7687"
# Not needed in the Community Edition
#user = "neo4j"
password = "12345678"
password = "12345678"
# Queries taking longer than this (ms) will be logged as warnings
slow_query_logging_threshold_ms = 100
# Enable or disable slow query logging
#slow_query_logging_enabled = true
# Include the full cypher (with interpolated params) in slow-query warnings
#slow_query_logging_include_cypher = false
6 changes: 6 additions & 0 deletions examples/watcher/watcher-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ uri = "bolt://localhost:7687"
# Not needed in the Community Edition
#user = "neo4j"
password = "12345678"
# Queries taking longer than this (ms) will be logged as warnings
slow_query_logging_threshold_ms = 100
# Enable or disable slow query logging
#slow_query_logging_enabled = true
# Include the full cypher (with interpolated params) in slow-query warnings
#slow_query_logging_include_cypher = false
2 changes: 2 additions & 0 deletions nexus-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license = "MIT"
async-trait = { workspace = true }
dirs = "6.0.0"
chrono = { workspace = true }
futures = { workspace = true }
neo4rs = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-appender-tracing = "0.31.1"
Expand All @@ -33,3 +34,4 @@ utoipa = "5.4.0"
[dev-dependencies]
tempfile = { workspace = true }
tokio-shared-rt = { workspace = true }
tracing-test = { workspace = true }
6 changes: 6 additions & 0 deletions nexus-common/default.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,9 @@ uri = "bolt://localhost:7687"
# Not needed in the Community Edition the profile username, just the password
#user = "neo4j"
password = "12345678"
# Queries taking longer than this (ms) will be logged as warnings
slow_query_logging_threshold_ms = 100
# Enable or disable slow query logging
slow_query_logging_enabled = true
# Include the Cypher query text in slow query log entries
#slow_query_logging_include_cypher = false
2 changes: 1 addition & 1 deletion nexus-common/src/db/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use std::fmt::Debug;

mod neo4j;
pub use neo4j::Neo4JConfig;
pub use neo4j::{Neo4JConfig, DEFAULT_SLOW_QUERY_THRESHOLD_MS};

pub const REDIS_URI: &str = "redis://localhost:6379";

Expand Down
29 changes: 29 additions & 0 deletions nexus-common/src/db/config/neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,55 @@ use serde::{Deserialize, Serialize};
pub const NEO4J_URI: &str = "bolt://localhost:7687";
pub const NEO4J_USER: &str = "neo4j";
pub const NEO4J_PASS: &str = "12345678";
pub const DEFAULT_SLOW_QUERY_THRESHOLD_MS: u64 = 100;

// Create temporal struct to wrap database config
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Neo4JConfig {
pub uri: String,

#[serde(default = "default_neo4j_user")]
pub user: String,

pub password: String,

/// Queries exceeding this threshold (in milliseconds) are logged as warnings.
/// Only used when `slow_query_logging_enabled` is true.
#[serde(default = "default_slow_query_logging_threshold_ms")]
pub slow_query_logging_threshold_ms: u64,

/// Enable slow-query logging. Defaults to true.
/// Set to false for CLI/admin commands where tracing overhead is unnecessary.
#[serde(default = "default_slow_query_logging_enabled")]
pub slow_query_logging_enabled: bool,

/// Include the full cypher (with interpolated params) in slow-query warnings.
/// Useful for debugging but verbose. Defaults to false.
#[serde(default)]
pub slow_query_logging_include_cypher: bool,
}

fn default_neo4j_user() -> String {
String::from("neo4j")
}

fn default_slow_query_logging_threshold_ms() -> u64 {
DEFAULT_SLOW_QUERY_THRESHOLD_MS
}

fn default_slow_query_logging_enabled() -> bool {
true
}

impl Default for Neo4JConfig {
fn default() -> Self {
Self {
uri: String::from(NEO4J_URI),
user: String::from(NEO4J_USER),
password: String::from(NEO4J_PASS),
slow_query_logging_threshold_ms: DEFAULT_SLOW_QUERY_THRESHOLD_MS,
slow_query_logging_enabled: true,
slow_query_logging_include_cypher: false,
}
}
}
42 changes: 25 additions & 17 deletions nexus-common/src/db/connectors/neo4j.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
use neo4rs::{query, Graph};
use crate::db::graph::Query;
use std::fmt;
use std::sync::OnceLock;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tracing::{debug, info};

use crate::db::graph::error::{GraphError, GraphResult};
use crate::db::graph::{Graph, GraphOps, TracedGraph};
use crate::db::setup::setup_graph;
use crate::db::Neo4JConfig;
use crate::types::DynError;

pub struct Neo4jConnector {
pub graph: Graph,
graph: Arc<dyn GraphOps>,
}

impl Neo4jConnector {
/// Initialize and register the global Neo4j connector and verify connectivity
pub async fn init(neo4j_config: &Neo4JConfig) -> Result<(), DynError> {
let neo4j_connector = Neo4jConnector::new_connection(
&neo4j_config.uri,
&neo4j_config.user,
&neo4j_config.password,
)
.await?;
let neo4j_connector = Neo4jConnector::new_connection(neo4j_config).await?;

neo4j_connector.ping(&neo4j_config.uri).await?;

Expand All @@ -35,17 +32,28 @@ impl Neo4jConnector {
}

/// Create and return a new connector after defining a database connection
async fn new_connection(uri: &str, user: &str, password: &str) -> GraphResult<Self> {
let graph = Graph::new(uri, user, password).await?;
let neo4j_connector = Neo4jConnector { graph };
info!("Created Neo4j connector");
async fn new_connection(config: &Neo4JConfig) -> GraphResult<Self> {
let neo4j_graph = neo4rs::Graph::new(&config.uri, &config.user, &config.password).await?;
let graph = Graph::new(neo4j_graph);

let graph: Arc<dyn GraphOps> = if config.slow_query_logging_enabled {
let threshold = Duration::from_millis(config.slow_query_logging_threshold_ms);
Arc::new(
TracedGraph::new(graph)
.with_slow_query_threshold(threshold)
.with_log_cypher(config.slow_query_logging_include_cypher),
)
} else {
Arc::new(graph)
};

Ok(neo4j_connector)
info!("Created Neo4j connector");
Ok(Neo4jConnector { graph })
}

/// Perform a health-check PING over the Bolt protocol to the Neo4j server
async fn ping(&self, neo4j_uri: &str) -> Result<(), DynError> {
if let Err(neo4j_err) = self.graph.execute(query("RETURN 1")).await {
if let Err(neo4j_err) = self.graph.run(Query::new("ping", "RETURN 1")).await {
return Err(format!("Failed to PING to Neo4j at {neo4j_uri}, {neo4j_err}").into());
}

Expand All @@ -57,13 +65,13 @@ impl Neo4jConnector {
impl fmt::Debug for Neo4jConnector {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Neo4jConnector")
.field("graph", &"Graph instance")
.field("graph", &"GraphOps instance")
.finish()
}
}

/// Helper to retrieve a Neo4j graph connection.
pub fn get_neo4j_graph() -> GraphResult<Graph> {
pub fn get_neo4j_graph() -> GraphResult<Arc<dyn GraphOps>> {
NEO4J_CONNECTOR
.get()
.ok_or(GraphError::ConnectionNotInitialized)
Expand Down
17 changes: 7 additions & 10 deletions nexus-common/src/db/graph/exec.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::query::Query;
use crate::db::{get_neo4j_graph, graph::error::GraphResult};
use neo4rs::{Query, Row};
use futures::TryStreamExt;
use neo4rs::Row;
use serde::de::DeserializeOwned;

/// Represents the outcome of a mutation-like query in the graph database.
Expand Down Expand Up @@ -37,7 +39,7 @@ pub async fn execute_graph_operation(query: Query) -> GraphResult<OperationOutco
pub async fn exec_single_row(query: Query) -> GraphResult<()> {
let graph = get_neo4j_graph()?;
let mut result = graph.execute(query).await?;
result.next().await?;
result.try_next().await?;
Ok(())
}

Expand All @@ -46,20 +48,15 @@ pub async fn fetch_row_from_graph(query: Query) -> GraphResult<Option<Row>> {

let mut result = graph.execute(query).await?;

result.next().await.map_err(Into::into)
result.try_next().await.map_err(Into::into)
}

pub async fn fetch_all_rows_from_graph(query: Query) -> GraphResult<Vec<Row>> {
let graph = get_neo4j_graph()?;

let mut result = graph.execute(query).await?;
let mut rows = Vec::new();

while let Some(row) = result.next().await? {
rows.push(row);
}
let result = graph.execute(query).await?;

Ok(rows)
result.try_collect().await.map_err(Into::into)
}

/// Fetch the value of type T mapped to a specific key from the first row of a graph query's result
Expand Down
5 changes: 5 additions & 0 deletions nexus-common/src/db/graph/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
pub mod error;
pub mod exec;
pub mod queries;
mod query;
pub mod setup;
mod traced;

pub use error::{GraphError, GraphResult};
pub use query::Query;
pub use traced::GraphOps;
pub(crate) use traced::{Graph, TracedGraph};
22 changes: 14 additions & 8 deletions nexus-common/src/db/graph/queries/del.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use neo4rs::{query, Query};
use crate::db::graph::Query;

/// Deletes a user node and all its relationships
/// # Arguments
/// * `user_id` - The unique identifier of the user to be deleted
pub fn delete_user(user_id: &str) -> Query {
query(
Query::new(
"delete_user",
"MATCH (u:User {id: $id})
DETACH DELETE u;",
)
Expand All @@ -16,7 +17,8 @@ pub fn delete_user(user_id: &str) -> Query {
/// * `author_id` - The unique identifier of the user who authored the post.
/// * `post_id` - The unique identifier of the post to be deleted.
pub fn delete_post(author_id: &str, post_id: &str) -> Query {
query(
Query::new(
"delete_post",
"MATCH (u:User {id: $author_id})-[:AUTHORED]->(p:Post {id: $post_id})
DETACH DELETE p;",
)
Expand All @@ -29,11 +31,12 @@ pub fn delete_post(author_id: &str, post_id: &str) -> Query {
/// * `follower_id` - The unique identifier of the user who is following another user.
/// * `followee_id` - The unique identifier of the user being followed
pub fn delete_follow(follower_id: &str, followee_id: &str) -> Query {
query(
Query::new(
"delete_follow",
"// Important that MATCH to check if both users are in the graph
MATCH (follower:User {id: $follower_id}), (followee:User {id: $followee_id})
// Check if follow already exist
OPTIONAL MATCH (follower)-[existing:FOLLOWS]->(followee)
OPTIONAL MATCH (follower)-[existing:FOLLOWS]->(followee)
DELETE existing
// Returns true if the relationship does not exist as 'flag'
RETURN existing IS NULL AS flag;",
Expand All @@ -47,7 +50,8 @@ pub fn delete_follow(follower_id: &str, followee_id: &str) -> Query {
/// * `user_id` - The unique identifier of the user who created the bookmark.
/// * `bookmark_id` - The unique identifier of the bookmark relationship to be deleted.
pub fn delete_bookmark(user_id: &str, bookmark_id: &str) -> Query {
query(
Query::new(
"delete_bookmark",
"MATCH (u:User {id: $user_id})-[b:BOOKMARKED {id: $bookmark_id}]->(post:Post)<-[:AUTHORED]-(author:User)
WITH post.id as post_id, author.id as author_id, b
DELETE b
Expand All @@ -62,7 +66,8 @@ pub fn delete_bookmark(user_id: &str, bookmark_id: &str) -> Query {
/// * `user_id` - The unique identifier of the user who created the tag.
/// * `tag_id` - The unique identifier of the `TAGGED` relationship to be deleted.
pub fn delete_tag(user_id: &str, tag_id: &str) -> Query {
query(
Query::new(
"delete_tag",
"MATCH (user:User {id: $user_id})-[tag:TAGGED {id: $tag_id}]->(target)
OPTIONAL MATCH (target)<-[:AUTHORED]-(author:User)
WITH CASE WHEN target:User THEN target.id ELSE null END AS user_id,
Expand All @@ -82,7 +87,8 @@ pub fn delete_tag(user_id: &str, tag_id: &str) -> Query {
/// * `owner_id` - The unique identifier of the user who owns the file
/// * `file_id` - The unique identifier of the file to be deleted
pub fn delete_file(owner_id: &str, file_id: &str) -> Query {
query(
Query::new(
"delete_file",
"MATCH (f:File {id: $id, owner_id: $owner_id})
DETACH DELETE f;",
)
Expand Down
Loading