From d96ceebc0adefca0e38dbebb87c64a16b5caff33 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 16 Jan 2026 10:27:06 +0000 Subject: [PATCH] fix: prevent duplicate entries in Redis lists during re-indexing Modified lists::put to use LREM + RPUSH pattern in a pipeline to prevent duplicate entries when re-indexing operations are performed. For each value being added: 1. LREM removes all existing occurrences from the list (count=0 removes all) 2. RPUSH appends the value to the end of the list This approach: - Prevents duplicates during re-indexing operations - Makes the operation idempotent (calling multiple times = calling once) - Uses atomic pipeline execution (no Lua scripts required) - Preserves append semantics (new values added to end) Note: If a value already exists, it will be removed from its original position and re-added at the end of the list. Files changed: - nexus-common/src/db/kv/index/lists.rs: Updated put() implementation - nexus-common/src/db/kv/traits.rs: Removed TODO warning about unsafe re-indexing --- nexus-common/src/db/kv/index/lists.rs | 19 +++++++++++++++---- nexus-common/src/db/kv/traits.rs | 5 +---- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/nexus-common/src/db/kv/index/lists.rs b/nexus-common/src/db/kv/index/lists.rs index 082ea0524..16776077c 100644 --- a/nexus-common/src/db/kv/index/lists.rs +++ b/nexus-common/src/db/kv/index/lists.rs @@ -2,10 +2,12 @@ use crate::db::get_redis_conn; use crate::types::DynError; use deadpool_redis::redis::AsyncCommands; -/// Adds elements to a Redis list. +/// Adds elements to a Redis list with deduplication. /// -/// This function appends elements to the specified Redis list. If the list doesn't exist, -/// it creates a new list. +/// This function appends elements to the specified Redis list while ensuring no duplicates. +/// For each value, it first removes any existing occurrences, then appends the value to the end. +/// This makes the operation idempotent - calling it multiple times with the same value results +/// in only one occurrence at the end of the list. /// /// # Arguments /// @@ -22,7 +24,16 @@ pub async fn put(prefix: &str, key: &str, values: &[&str]) -> Result<(), DynErro } let index_key = format!("{prefix}:{key}"); let mut redis_conn = get_redis_conn().await?; - let _: () = redis_conn.rpush(index_key, values).await?; + + // Use a pipeline to remove duplicates then append each value + // LREM removes all existing occurrences (0 = all), then RPUSH appends to end + // This is atomic from the perspective of other Redis clients + let mut pipe = deadpool_redis::redis::pipe(); + for value in values { + pipe.lrem(&index_key, 0, value).rpush(&index_key, value); + } + let _: () = pipe.query_async(&mut redis_conn).await?; + Ok(()) } diff --git a/nexus-common/src/db/kv/traits.rs b/nexus-common/src/db/kv/traits.rs index 33e949a92..d5d293be3 100644 --- a/nexus-common/src/db/kv/traits.rs +++ b/nexus-common/src/db/kv/traits.rs @@ -244,15 +244,12 @@ pub trait RedisOps: Serialize + DeserializeOwned + Send + Sync { let prefix = Self::prefix().await; let key = key_parts.join(":"); - // TODO: Unsafe. If re-indexed it will duplicate follower/following list entries. - // Need reading, matching out the duplicates then storing. Inneficient. - // Needs mode safety for double-write. - // Directly use the string representations of items without additional serialization let collection = self.as_ref(); let values: Vec<&str> = collection.iter().map(|item| item.as_ref()).collect(); // Store the values in the Redis list + // Note: lists::put uses LREM + RPUSH to prevent duplicates during re-indexing lists::put(&prefix, &key, &values).await }