Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions nexus-common/src/db/kv/index/lists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use crate::db::get_redis_conn;
use crate::types::DynError;
use deadpool_redis::redis::AsyncCommands;

/// Adds elements to a Redis list.
/// Adds elements to a Redis list with deduplication.
///
/// This function appends elements to the specified Redis list. If the list doesn't exist,
/// it creates a new list.
/// This function appends elements to the specified Redis list while ensuring no duplicates.
/// For each value, it first removes any existing occurrences, then appends the value to the end.
/// This makes the operation idempotent - calling it multiple times with the same value results
/// in only one occurrence at the end of the list.
///
/// # Arguments
///
Expand All @@ -22,7 +24,16 @@ pub async fn put(prefix: &str, key: &str, values: &[&str]) -> Result<(), DynErro
}
let index_key = format!("{prefix}:{key}");
let mut redis_conn = get_redis_conn().await?;
let _: () = redis_conn.rpush(index_key, values).await?;

// Use a pipeline to remove duplicates then append each value
// LREM removes all existing occurrences (0 = all), then RPUSH appends to end
// This is atomic from the perspective of other Redis clients
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That comment is not strictly correct in Redis terms:

  • A pipeline sends multiple commands in one batch over the same connection, reducing round trips.
  • But it does not make them atomic. Other clients’ commands can still be interleaved between these commands on the server.

To get true atomicity in Redis, you’d need MULTI / EXEC (transaction) or a Lua script (EVAL).

let mut pipe = deadpool_redis::redis::pipe();
for value in values {
pipe.lrem(&index_key, 0, value).rpush(&index_key, value);
}
let _: () = pipe.query_async(&mut redis_conn).await?;

Ok(())
}

Expand Down
5 changes: 1 addition & 4 deletions nexus-common/src/db/kv/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,12 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync {
let prefix = Self::prefix().await;
let key = key_parts.join(":");

// TODO: Unsafe. If re-indexed it will duplicate follower/following list entries.
// Need reading, matching out the duplicates then storing. Inneficient.
// Needs mode safety for double-write.

// Directly use the string representations of items without additional serialization
let collection = self.as_ref();
let values: Vec<&str> = collection.iter().map(|item| item.as_ref()).collect();

// Store the values in the Redis list
// Note: lists::put uses LREM + RPUSH to prevent duplicates during re-indexing
lists::put(&prefix, &key, &values).await
}

Expand Down