diff --git a/nexus-common/src/db/kv/index/lists.rs b/nexus-common/src/db/kv/index/lists.rs index 082ea0524..16776077c 100644 --- a/nexus-common/src/db/kv/index/lists.rs +++ b/nexus-common/src/db/kv/index/lists.rs @@ -2,10 +2,12 @@ use crate::db::get_redis_conn; use crate::types::DynError; use deadpool_redis::redis::AsyncCommands; -/// Adds elements to a Redis list. +/// Adds elements to a Redis list with deduplication. /// -/// This function appends elements to the specified Redis list. If the list doesn't exist, -/// it creates a new list. +/// This function appends elements to the specified Redis list while ensuring no duplicates. +/// For each value, it first removes any existing occurrences, then appends the value to the end. +/// This makes the operation idempotent - calling it multiple times with the same value results +/// in only one occurrence at the end of the list. /// /// # Arguments /// @@ -22,7 +24,16 @@ pub async fn put(prefix: &str, key: &str, values: &[&str]) -> Result<(), DynErro } let index_key = format!("{prefix}:{key}"); let mut redis_conn = get_redis_conn().await?; - let _: () = redis_conn.rpush(index_key, values).await?; + + // Use a pipeline to remove duplicates then append each value + // LREM removes all existing occurrences (0 = all), then RPUSH appends to end + // This is atomic from the perspective of other Redis clients + let mut pipe = deadpool_redis::redis::pipe(); + for value in values { + pipe.lrem(&index_key, 0, value).rpush(&index_key, value); + } + let _: () = pipe.query_async(&mut redis_conn).await?; + Ok(()) } diff --git a/nexus-common/src/db/kv/traits.rs b/nexus-common/src/db/kv/traits.rs index 33e949a92..d5d293be3 100644 --- a/nexus-common/src/db/kv/traits.rs +++ b/nexus-common/src/db/kv/traits.rs @@ -244,15 +244,12 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync { let prefix = Self::prefix().await; let key = key_parts.join(":"); - // TODO: Unsafe. If re-indexed it will duplicate follower/following list entries. - // Need reading, matching out the duplicates then storing. Inneficient. - // Needs mode safety for double-write. - // Directly use the string representations of items without additional serialization let collection = self.as_ref(); let values: Vec<&str> = collection.iter().map(|item| item.as_ref()).collect(); // Store the values in the Redis list + // Note: lists::put uses LREM + RPUSH to prevent duplicates during re-indexing lists::put(&prefix, &key, &values).await }