diff --git a/core/src/init.rs b/core/src/init.rs index 94525df..1b20ca3 100644 --- a/core/src/init.rs +++ b/core/src/init.rs @@ -330,7 +330,7 @@ pub async fn cleanup_job(pool: &PgPool, job_name: &str) -> Result<(), VectorizeE // Delete pending PGMQ messages for this job // We search for messages where the job_name matches let delete_messages_query = - "DELETE FROM pgmq.vectorize_jobs WHERE message->>'job_name' = $1".to_string(); + "DELETE FROM pgmq.q_vectorize_jobs WHERE message->>'job_name' = $1".to_string(); match sqlx::query(&delete_messages_query) .bind(job_name) .execute(pool) diff --git a/core/src/query.rs b/core/src/query.rs index 1ebbbf9..20afdb0 100644 --- a/core/src/query.rs +++ b/core/src/query.rs @@ -731,8 +731,8 @@ pub fn hybrid_search_query( } format!( - " - SELECT to_jsonb(t) as results + " + SELECT to_jsonb(t) as results FROM ( SELECT {cols}, t.rrf_score, t.semantic_rank, t.fts_rank, t.similarity_score FROM ( @@ -742,21 +742,14 @@ pub fn hybrid_search_query( s.similarity_score, f.fts_rank, ( - CASE - WHEN s.semantic_rank IS NOT NULL THEN {semantic_weight}::float/({rrf_k} + s.semantic_rank) - ELSE 0 - END + - CASE - WHEN f.fts_rank IS NOT NULL THEN {fts_weight}::float/({rrf_k} + f.fts_rank) - ELSE 0 - END + COALESCE({semantic_weight}::float / ({rrf_k} + s.semantic_rank), 0) + + COALESCE({fts_weight}::float / ({rrf_k} + f.fts_rank), 0) ) as rrf_score FROM ( SELECT {join_key}, distance, ROW_NUMBER() OVER (ORDER BY distance) as semantic_rank, - COUNT(*) OVER () as max_semantic_rank, 1 - distance as similarity_score FROM ( SELECT @@ -770,17 +763,16 @@ pub fn hybrid_search_query( FULL OUTER JOIN ( SELECT {join_key}, - ROW_NUMBER() OVER (ORDER BY ts_rank_cd(search_tokens, query) DESC) as fts_rank, - COUNT(*) OVER () as max_fts_rank - FROM vectorize._search_tokens_{job_name}, - to_tsquery('english', + ROW_NUMBER() OVER (ORDER BY ts_rank_cd(search_tokens, query) DESC) as fts_rank + FROM vectorize._search_tokens_{job_name}, + to_tsquery('english', NULLIF( replace(plainto_tsquery('english', $2)::text, ' & ', ' | '), '' ) ) as query WHERE search_tokens @@ query - ORDER BY ts_rank_cd(search_tokens, query) DESC + ORDER BY ts_rank_cd(search_tokens, query) DESC LIMIT {window_size} ) f ON s.{join_key} = f.{join_key} ) t @@ -789,7 +781,7 @@ pub fn hybrid_search_query( ORDER BY t.rrf_score DESC LIMIT {limit} ) t" -) + ) } #[cfg(test)] mod tests { diff --git a/server/Cargo.toml b/server/Cargo.toml index bf13d92..5efbcc6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -4,6 +4,10 @@ version = "0.1.0" edition = "2024" publish = false +[[bin]] +name = "vectorize-worker" +path = "src/bin/worker.rs" + [lib] name = "vectorize_server" path = "src/lib.rs" diff --git a/server/Dockerfile b/server/Dockerfile index 113b1f4..493c512 100644 --- a/server/Dockerfile +++ b/server/Dockerfile @@ -15,7 +15,8 @@ COPY Cargo.toml Cargo.lock ./ ENV SQLX_OFFLINE=1 RUN cargo build --bin vectorize-server --release - +RUN cargo build --bin vectorize-worker --release + FROM rust:1.90.0-slim-bookworm RUN apt-get update && \ @@ -23,5 +24,6 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* /var/cache/apt/archives/* COPY --from=builder /build/target/release/vectorize-server /usr/local/bin/vectorize-server +COPY --from=builder /build/target/release/vectorize-worker /usr/local/bin/vectorize-worker CMD ["vectorize-server"] \ No newline at end of file diff --git a/server/tests/tests.rs b/server/tests/tests.rs index 2e9e9ca..7e477eb 100644 --- a/server/tests/tests.rs +++ b/server/tests/tests.rs @@ -1098,21 +1098,7 @@ async fn test_delete_job_with_pending_messages() { let cfg = vectorize_core::config::Config::from_env(); let pool = sqlx::PgPool::connect(&cfg.database_url).await.unwrap(); - let mut rng = rand::rng(); - let test_num = rng.random_range(1..100000); - let table = format!("test_pending_msgs_{test_num}"); - - // Create table - sqlx::query(&format!( - "CREATE TABLE IF NOT EXISTS vectorize_test.{table} ( - id SERIAL PRIMARY KEY, - content TEXT, - updated_at TIMESTAMPTZ DEFAULT NOW() - );" - )) - .execute(&pool) - .await - .unwrap(); + let table = common::create_test_table().await; // Insert multiple rows for i in 0..10 { @@ -1125,7 +1111,7 @@ async fn test_delete_job_with_pending_messages() { .unwrap(); } - let job_name = format!("test_pending_{test_num}"); + let job_name = format!("test_pending_{}", table); // Create a vectorize job let payload = json!({