Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ pub async fn cleanup_job(pool: &PgPool, job_name: &str) -> Result<(), VectorizeE
// Delete pending PGMQ messages for this job
// We search for messages where the job_name matches
let delete_messages_query =
"DELETE FROM pgmq.vectorize_jobs WHERE message->>'job_name' = $1".to_string();
"DELETE FROM pgmq.q_vectorize_jobs WHERE message->>'job_name' = $1".to_string();
match sqlx::query(&delete_messages_query)
.bind(job_name)
.execute(pool)
Expand Down
26 changes: 9 additions & 17 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,8 +731,8 @@ pub fn hybrid_search_query(
}

format!(
"
SELECT to_jsonb(t) as results
"
SELECT to_jsonb(t) as results
FROM (
SELECT {cols}, t.rrf_score, t.semantic_rank, t.fts_rank, t.similarity_score
FROM (
Expand All @@ -742,21 +742,14 @@ pub fn hybrid_search_query(
s.similarity_score,
f.fts_rank,
(
CASE
WHEN s.semantic_rank IS NOT NULL THEN {semantic_weight}::float/({rrf_k} + s.semantic_rank)
ELSE 0
END +
CASE
WHEN f.fts_rank IS NOT NULL THEN {fts_weight}::float/({rrf_k} + f.fts_rank)
ELSE 0
END
COALESCE({semantic_weight}::float / ({rrf_k} + s.semantic_rank), 0) +
COALESCE({fts_weight}::float / ({rrf_k} + f.fts_rank), 0)
) as rrf_score
FROM (
SELECT
{join_key},
distance,
ROW_NUMBER() OVER (ORDER BY distance) as semantic_rank,
COUNT(*) OVER () as max_semantic_rank,
1 - distance as similarity_score
FROM (
SELECT
Expand All @@ -770,17 +763,16 @@ pub fn hybrid_search_query(
FULL OUTER JOIN (
SELECT
{join_key},
ROW_NUMBER() OVER (ORDER BY ts_rank_cd(search_tokens, query) DESC) as fts_rank,
COUNT(*) OVER () as max_fts_rank
FROM vectorize._search_tokens_{job_name},
to_tsquery('english',
ROW_NUMBER() OVER (ORDER BY ts_rank_cd(search_tokens, query) DESC) as fts_rank
FROM vectorize._search_tokens_{job_name},
to_tsquery('english',
NULLIF(
replace(plainto_tsquery('english', $2)::text, ' & ', ' | '),
''
)
) as query
WHERE search_tokens @@ query
ORDER BY ts_rank_cd(search_tokens, query) DESC
ORDER BY ts_rank_cd(search_tokens, query) DESC
LIMIT {window_size}
) f ON s.{join_key} = f.{join_key}
) t
Expand All @@ -789,7 +781,7 @@ pub fn hybrid_search_query(
ORDER BY t.rrf_score DESC
LIMIT {limit}
) t"
)
)
}
#[cfg(test)]
mod tests {
Expand Down
4 changes: 4 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ version = "0.1.0"
edition = "2024"
publish = false

[[bin]]
name = "vectorize-worker"
path = "src/bin/worker.rs"

[lib]
name = "vectorize_server"
path = "src/lib.rs"
Expand Down
4 changes: 3 additions & 1 deletion server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ COPY Cargo.toml Cargo.lock ./

ENV SQLX_OFFLINE=1
RUN cargo build --bin vectorize-server --release

RUN cargo build --bin vectorize-worker --release

FROM rust:1.90.0-slim-bookworm

RUN apt-get update && \
apt-get install -y postgresql-client && apt-get clean && \
rm -rf /var/lib/apt/lists/* /var/cache/apt/archives/*

COPY --from=builder /build/target/release/vectorize-server /usr/local/bin/vectorize-server
COPY --from=builder /build/target/release/vectorize-worker /usr/local/bin/vectorize-worker

CMD ["vectorize-server"]
18 changes: 2 additions & 16 deletions server/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,21 +1098,7 @@ async fn test_delete_job_with_pending_messages() {
let cfg = vectorize_core::config::Config::from_env();
let pool = sqlx::PgPool::connect(&cfg.database_url).await.unwrap();

let mut rng = rand::rng();
let test_num = rng.random_range(1..100000);
let table = format!("test_pending_msgs_{test_num}");

// Create table
sqlx::query(&format!(
"CREATE TABLE IF NOT EXISTS vectorize_test.{table} (
id SERIAL PRIMARY KEY,
content TEXT,
updated_at TIMESTAMPTZ DEFAULT NOW()
);"
))
.execute(&pool)
.await
.unwrap();
let table = common::create_test_table().await;

// Insert multiple rows
for i in 0..10 {
Expand All @@ -1125,7 +1111,7 @@ async fn test_delete_job_with_pending_messages() {
.unwrap();
}

let job_name = format!("test_pending_{test_num}");
let job_name = format!("test_pending_{}", table);

// Create a vectorize job
let payload = json!({
Expand Down