diff --git a/.claude/commands/downvote-review.md b/.claude/commands/downvote-review.md new file mode 100644 index 0000000..26a83d5 --- /dev/null +++ b/.claude/commands/downvote-review.md @@ -0,0 +1,124 @@ +# Downvote Review + +Review downvoted snippets, hide them, create corrective KB entries, and generate a report. + +## Steps + +### 1. Find unreviewed downvoted snippets + +Use the Supabase MCP to query for downvoted snippets that haven't been processed yet: + +```sql +SELECT + s.id AS snippet_id, + s.title, + s.explanation, + s.disinformation_categories, + s.confidence_scores, + s.created_at, + drq.status AS queue_status, + COUNT(uls.id) FILTER (WHERE uls.value = -1) AS downvote_count +FROM snippets s +JOIN user_like_snippets uls ON uls.snippet = s.id +LEFT JOIN downvote_review_queue drq ON drq.snippet_id = s.id +WHERE uls.value = -1 +AND (drq.status IS NULL OR drq.status = 'pending' OR drq.status = 'error') +AND NOT EXISTS ( + SELECT 1 FROM user_hide_snippets uhs WHERE uhs.snippet = s.id +) +GROUP BY s.id, drq.status +ORDER BY s.created_at DESC; +``` + +If no results, check for completed reviews too: +```sql +SELECT status, COUNT(*) FROM downvote_review_queue GROUP BY status; +``` + +Report findings to the user. If no unreviewed snippets exist, say so and stop. + +### 2. Group by theme + +Analyze the snippet titles and categories to group them into thematic clusters. Present the groups to the user for review. + +### 3. Hide snippets + +For any unhidden snippets, insert into user_hide_snippets: +```sql +INSERT INTO user_hide_snippets (snippet) +VALUES ('') +ON CONFLICT (snippet) DO NOTHING; +``` + +Also insert into the review queue: +```sql +INSERT INTO downvote_review_queue (snippet_id, downvoted_at) +VALUES ('', now()) +ON CONFLICT (snippet_id) DO NOTHING; +``` + +### 4. Research and create KB entries + +For each thematic group: +1. Use subagents to research the correct facts via web search +2. Find authoritative sources (Reuters, AP, NPR, official gov sites, fact-checkers) +3. Check existing KB entries to avoid duplicates: + ```sql + SELECT id, fact FROM kb_entries WHERE status = 'active' AND fact ILIKE '%%'; + ``` +4. Insert new KB entries with sources using CTEs: + ```sql + WITH new_entry AS ( + INSERT INTO kb_entries (fact, related_claim, confidence_score, + disinformation_categories, keywords, is_time_sensitive, + valid_from, valid_until, created_by_model, status) + VALUES (...) RETURNING id + ) + INSERT INTO kb_entry_sources (kb_entry, url, source_name, source_type, + relevant_excerpt, access_date) + SELECT id, ..., CURRENT_DATE FROM new_entry; + ``` +5. Use `created_by_model = 'claude-downvote-review'` for traceability + +### 5. Verify entries + +Launch verification subagents to fact-check each new KB entry against live web sources. Fix any inaccuracies found. + +### 6. Generate embeddings + +Run the backfill script: +```bash +source .venv/bin/activate && python -m src.scripts.backfill_kb_embeddings +``` + +### 7. Update queue status + +Mark all processed snippets as completed: +```sql +UPDATE downvote_review_queue +SET status = 'completed', processed_at = now(), kb_entries_created = +WHERE snippet_id IN ('', '', ...); +``` + +### 8. Generate report + +Create a markdown report at `reports/_downvote_review.md` with: +- Executive summary (snippets found, hidden, KB entries created) +- Grouped snippet analysis +- KB entries created with sources +- Verification results and corrections +- Database changes summary + +Commit and push the report. + +### 9. Post to Slack + +Post a summary to #verdad channel (ID: C07JYU3729G) using the Slack MCP tools, linking to the GitHub report. + +## Notes + +- The VERDAD Supabase project ID is `dzujjhzgzguciwryzwlx` +- Valid source_type values: tier1_wire_service, tier1_factchecker, tier2_major_news, tier3_regional_news, official_source, other +- KB entries need confidence >= 70 and at least one external source +- Include Spanish-language keywords since the pipeline analyzes Spanish radio +- Always check existing KB entries before creating new ones to avoid duplicates diff --git a/fly.processing_worker.toml b/fly.processing_worker.toml index f431dcd..fdcf553 100644 --- a/fly.processing_worker.toml +++ b/fly.processing_worker.toml @@ -18,6 +18,7 @@ primary_region = 'sea' undo_audio_clipping = '' analysis_review = '' analysis_review_2 = '' + downvote_review = '' embedding = '' [[vm]] @@ -39,6 +40,12 @@ primary_region = 'sea' cpu_kind = 'shared' cpus = 8 +[[vm]] + processes = ["downvote_review"] + memory = '2gb' + cpu_kind = 'shared' + cpus = 4 + [[vm]] processes = ["regenerate_timestamped_transcript", "embedding"] memory = '1gb' diff --git a/src/processing_pipeline/main.py b/src/processing_pipeline/main.py index dcfdfa5..541d258 100644 --- a/src/processing_pipeline/main.py +++ b/src/processing_pipeline/main.py @@ -2,11 +2,17 @@ from dotenv import load_dotenv from prefect import serve import sentry_sdk -from processing_pipeline.stage_1 import initial_disinformation_detection, redo_main_detection, regenerate_timestamped_transcript, undo_disinformation_detection +from processing_pipeline.stage_1 import ( + initial_disinformation_detection, + redo_main_detection, + regenerate_timestamped_transcript, + undo_disinformation_detection, +) from processing_pipeline.stage_2 import audio_clipping, undo_audio_clipping from processing_pipeline.stage_3 import in_depth_analysis from processing_pipeline.stage_5 import embedding -from processing_pipeline.stage_4 import analysis_review +from processing_pipeline.stage_4 import analysis_review, downvote_review # noqa: F401 + load_dotenv() # Setup Sentry @@ -51,7 +57,9 @@ deployment = audio_clipping.to_deployment( name="Stage 2: Audio Clipping", concurrency_limit=100, - parameters=dict(context_before_seconds=90, context_after_seconds=60, repeat=True), + parameters=dict( + context_before_seconds=90, context_after_seconds=60, repeat=True + ), ) serve(deployment, limit=100) case "undo_audio_clipping": @@ -81,6 +89,13 @@ parameters=dict(snippet_ids=[], repeat=True), ) serve(deployment, limit=100) + case "downvote_review": + deployment = downvote_review.to_deployment( + name="Stage 4: Downvote Review", + concurrency_limit=1, + parameters=dict(repeat=True), + ) + serve(deployment, limit=1) case "embedding": deployment = embedding.to_deployment( name="Stage 5: Embedding", diff --git a/src/processing_pipeline/stage_4/__init__.py b/src/processing_pipeline/stage_4/__init__.py index 648dcc0..a682067 100644 --- a/src/processing_pipeline/stage_4/__init__.py +++ b/src/processing_pipeline/stage_4/__init__.py @@ -1,4 +1,5 @@ from .executor import Stage4Executor from .flows import analysis_review +from .downvote_flows import downvote_review -__all__ = ["Stage4Executor", "analysis_review"] +__all__ = ["Stage4Executor", "analysis_review", "downvote_review"] diff --git a/src/processing_pipeline/stage_4/downvote_flows.py b/src/processing_pipeline/stage_4/downvote_flows.py new file mode 100644 index 0000000..cf011b1 --- /dev/null +++ b/src/processing_pipeline/stage_4/downvote_flows.py @@ -0,0 +1,111 @@ +import asyncio +import os + +from prefect.task_runners import ConcurrentTaskRunner + +from processing_pipeline.constants import PromptStage +from processing_pipeline.stage_4.constants import Stage4SubStage +from processing_pipeline.stage_4.tasks import ( + fetch_a_specific_snippet_from_supabase, + process_snippet, +) +from processing_pipeline.supabase_utils import SupabaseClient +from utils import optional_flow + + +@optional_flow( + name="Stage 4: Downvote Review", + log_prints=True, + task_runner=ConcurrentTaskRunner, +) +async def downvote_review(repeat=True): + """Process downvoted snippets through the Stage 4 KB review pipeline. + + Polls the downvote_review_queue table for pending entries. For each: + 1. Claims the entry (atomic status update to prevent double-processing) + 2. Ensures the snippet is hidden + 3. Runs the full Stage 4 review pipeline (reviewer + KB researcher + + web researcher + KB updater agents) + 4. Marks the queue entry as completed or errored + """ + os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_GEMINI_PAID_KEY") + + supabase_client = SupabaseClient( + supabase_url=os.getenv("SUPABASE_URL"), + supabase_key=os.getenv("SUPABASE_KEY"), + ) + + prompt_versions = { + "kb_researcher": supabase_client.get_active_prompt( + PromptStage.STAGE_4, Stage4SubStage.KB_RESEARCHER + ), + "web_researcher": supabase_client.get_active_prompt( + PromptStage.STAGE_4, Stage4SubStage.WEB_RESEARCHER + ), + "reviewer": supabase_client.get_active_prompt( + PromptStage.STAGE_4, Stage4SubStage.REVIEWER + ), + "kb_updater": supabase_client.get_active_prompt( + PromptStage.STAGE_4, Stage4SubStage.KB_UPDATER + ), + } + + while True: + pending = supabase_client.get_pending_downvote_reviews(limit=1) + if not pending: + if not repeat: + print("No pending downvote reviews. Exiting.") + break + print("No pending downvote reviews. Sleeping 30s...") + await asyncio.sleep(30) + continue + + queue_entry = pending[0] + claimed = supabase_client.claim_downvote_review(queue_entry["id"]) + if not claimed: + print(f"Queue entry {queue_entry['id']} already claimed. Skipping.") + continue + + snippet_id = queue_entry["snippet_id"] + print(f"Processing downvoted snippet: {snippet_id}") + + snippet = fetch_a_specific_snippet_from_supabase(supabase_client, snippet_id) + if not snippet: + supabase_client.fail_downvote_review(queue_entry["id"], "Snippet not found") + continue + + try: + supabase_client.hide_snippet_by_system(snippet_id) + + # Prepend downvote context so the reviewer agents understand + # this snippet was flagged as a false positive by users + if snippet.get("context") and snippet["context"].get("main"): + downvote_prefix = ( + "[DOWNVOTE REVIEW CONTEXT] This snippet was downvoted by users " + "as a FALSE POSITIVE — the content likely reports real events that " + "were incorrectly flagged as disinformation. Focus on researching " + "the correct facts and creating KB entries to prevent similar false " + "positives in the future.\n\n" + ) + snippet["context"]["main"] = ( + downvote_prefix + snippet["context"]["main"] + ) + + await process_snippet(supabase_client, snippet, prompt_versions) + supabase_client.complete_downvote_review( + queue_entry["id"], kb_entries_created=1 + ) + print(f"Downvote review completed for snippet {snippet_id}") + + except Exception as e: + error_msg = str(e) + if isinstance(e, ExceptionGroup): + error_msg = "\n".join( + f"- {type(exc).__name__}: {exc}" for exc in e.exceptions + ) + print(f"Downvote review failed for snippet {snippet_id}: {error_msg}") + supabase_client.fail_downvote_review(queue_entry["id"], error_msg) + + if not repeat: + break + await asyncio.sleep(2) diff --git a/src/processing_pipeline/supabase_utils.py b/src/processing_pipeline/supabase_utils.py index f367892..01e7ac6 100644 --- a/src/processing_pipeline/supabase_utils.py +++ b/src/processing_pipeline/supabase_utils.py @@ -17,7 +17,9 @@ def get_a_new_audio_file_and_reserve_it(self): return response.data if response else None def get_a_new_stage_1_llm_response_and_reserve_it(self): - response = self.client.rpc("fetch_a_new_stage_1_llm_response_and_reserve_it").execute() + response = self.client.rpc( + "fetch_a_new_stage_1_llm_response_and_reserve_it" + ).execute() return response.data if response else None def get_a_new_snippet_and_reserve_it(self): @@ -25,7 +27,9 @@ def get_a_new_snippet_and_reserve_it(self): return response.data if response else None def get_a_ready_for_review_snippet_and_reserve_it(self): - response = self.client.rpc("fetch_a_ready_for_review_snippet_and_reserve_it").execute() + response = self.client.rpc( + "fetch_a_ready_for_review_snippet_and_reserve_it" + ).execute() return response.data if response.data else None def get_snippet_by_id(self, id, select="*"): @@ -37,11 +41,18 @@ def get_snippets_by_ids(self, ids, select="*"): return response.data def get_audio_file_by_id(self, id, select="*"): - response = self.client.table("audio_files").select(select).eq("id", id).execute() + response = ( + self.client.table("audio_files").select(select).eq("id", id).execute() + ) return response.data[0] if response.data else None def get_stage_1_llm_response_by_id(self, id, select="*"): - response = self.client.table("stage_1_llm_responses").select(select).eq("id", id).execute() + response = ( + self.client.table("stage_1_llm_responses") + .select(select) + .eq("id", id) + .execute() + ) return response.data[0] if response.data else None def set_audio_file_status(self, id, status, error_message=None): @@ -53,7 +64,12 @@ def set_audio_file_status(self, id, status, error_message=None): .execute() ) else: - response = self.client.table("audio_files").update({"status": status}).eq("id", id).execute() + response = ( + self.client.table("audio_files") + .update({"status": status}) + .eq("id", id) + .execute() + ) return response.data def set_stage_1_llm_response_status(self, id, status, error_message=None): @@ -65,7 +81,12 @@ def set_stage_1_llm_response_status(self, id, status, error_message=None): .execute() ) else: - response = self.client.table("stage_1_llm_responses").update({"status": status}).eq("id", id).execute() + response = ( + self.client.table("stage_1_llm_responses") + .update({"status": status}) + .eq("id", id) + .execute() + ) return response.data def set_snippet_status(self, id, status, error_message=None): @@ -77,7 +98,12 @@ def set_snippet_status(self, id, status, error_message=None): .execute() ) else: - response = self.client.table("snippets").update({"status": status}).eq("id", id).execute() + response = ( + self.client.table("snippets") + .update({"status": status}) + .eq("id", id) + .execute() + ) return response.data def insert_audio_file( @@ -120,7 +146,9 @@ def get_active_prompt(self, stage: PromptStage, sub_stage: StrEnum | None = None query = query.is_("sub_stage", "null") response = query.limit(1).execute() if not response.data: - raise ValueError(f"No active prompt version found for stage: {stage}, sub_stage: {sub_stage}") + raise ValueError( + f"No active prompt version found for stage: {stage}, sub_stage: {sub_stage}" + ) return response.data[0] def get_prompt_by_id(self, prompt_version_id: str): @@ -290,27 +318,29 @@ def submit_snippet_review( political_leaning, grounding_metadata, reviewed_by, - thought_summaries=None + thought_summaries=None, ): response = ( self.client.table("snippets") - .update({ - "translation": translation, - "title": title, - "summary": summary, - "explanation": explanation, - "disinformation_categories": disinformation_categories, - "keywords_detected": keywords_detected, - "language": language, - "confidence_scores": confidence_scores, - "political_leaning": political_leaning, - "grounding_metadata": grounding_metadata, - "thought_summaries": thought_summaries, - "status": "Processed", - "error_message": None, - "reviewed_at": datetime.now(timezone.utc).isoformat(), - "reviewed_by": reviewed_by, - }) + .update( + { + "translation": translation, + "title": title, + "summary": summary, + "explanation": explanation, + "disinformation_categories": disinformation_categories, + "keywords_detected": keywords_detected, + "language": language, + "confidence_scores": confidence_scores, + "political_leaning": political_leaning, + "grounding_metadata": grounding_metadata, + "thought_summaries": thought_summaries, + "status": "Processed", + "error_message": None, + "reviewed_at": datetime.now(timezone.utc).isoformat(), + "reviewed_by": reviewed_by, + } + ) .eq("id", id) .execute() ) @@ -355,13 +385,17 @@ def update_stage_1_llm_response_detection_result(self, id, detection_result): ) return response.data - def update_stage_1_llm_response_timestamped_transcription(self, id, timestamped_transcription, transcriptor): + def update_stage_1_llm_response_timestamped_transcription( + self, id, timestamped_transcription, transcriptor + ): response = ( self.client.table("stage_1_llm_responses") - .update({ - "timestamped_transcription": timestamped_transcription, - "transcriptor": transcriptor - }) + .update( + { + "timestamped_transcription": timestamped_transcription, + "transcriptor": transcriptor, + } + ) .eq("id", id) .execute() ) @@ -379,81 +413,134 @@ def reset_stage_1_llm_response_status(self, id): def create_new_label(self, text, text_spanish): # Check if the label with the same text already exists existing_label = ( - self.client.table("labels") - .select("*") - .eq("text", text) - .execute() + self.client.table("labels").select("*").eq("text", text).execute() ) if existing_label.data: print(f"Label '{text}' already exists") return existing_label.data[0] else: - response = self.client.table("labels").insert({ - "text": text, - "text_spanish": text_spanish, - "is_ai_suggested": True, - }).execute() + response = ( + self.client.table("labels") + .insert( + { + "text": text, + "text_spanish": text_spanish, + "is_ai_suggested": True, + } + ) + .execute() + ) return response.data[0] def assign_label_to_snippet(self, label_id, snippet_id): # Check if the label is already assigned to the snippet - existing_snippet_label = self.client.table("snippet_labels").select("*").eq("label", label_id).eq("snippet", snippet_id).execute() + existing_snippet_label = ( + self.client.table("snippet_labels") + .select("*") + .eq("label", label_id) + .eq("snippet", snippet_id) + .execute() + ) if existing_snippet_label.data: print(f"Label {label_id} already assigned to snippet {snippet_id}") return existing_snippet_label.data[0] else: response = ( self.client.table("snippet_labels") - .insert({ - "label": label_id, - "snippet": snippet_id, - }) + .insert( + { + "label": label_id, + "snippet": snippet_id, + } + ) .execute() ) return response.data[0] def reset_audio_file_status(self, ids): - response = self.client.table("audio_files").update({"status": "New", "error_message": None}).in_("id", ids).execute() + response = ( + self.client.table("audio_files") + .update({"status": "New", "error_message": None}) + .in_("id", ids) + .execute() + ) return response.data def delete_stage_1_llm_responses(self, audio_file_ids): - response = self.client.table("stage_1_llm_responses").delete().in_("audio_file", audio_file_ids).execute() + response = ( + self.client.table("stage_1_llm_responses") + .delete() + .in_("audio_file", audio_file_ids) + .execute() + ) return response.data def get_a_snippet_that_has_no_embedding(self): response = self.client.rpc("fetch_a_snippet_that_has_no_embedding").execute() return response.data if response.data else None - def upsert_snippet_embedding(self, snippet_id, snippet_document, document_token_count, embedding, model_name, status, error_message): + def upsert_snippet_embedding( + self, + snippet_id, + snippet_document, + document_token_count, + embedding, + model_name, + status, + error_message, + ): # Check if the embedding of the snippet already exists - existing_embedding = self.client.table("snippet_embeddings").select("id").eq("snippet", snippet_id).execute() + existing_embedding = ( + self.client.table("snippet_embeddings") + .select("id") + .eq("snippet", snippet_id) + .execute() + ) if existing_embedding.data: # If it exists, update the embedding - response = self.client.table("snippet_embeddings").update({ - "snippet_document": snippet_document, - "document_token_count": document_token_count, - "embedding": embedding, - "model_name": model_name, - "status": status, - "error_message": error_message, - }).eq("snippet", snippet_id).execute() + response = ( + self.client.table("snippet_embeddings") + .update( + { + "snippet_document": snippet_document, + "document_token_count": document_token_count, + "embedding": embedding, + "model_name": model_name, + "status": status, + "error_message": error_message, + } + ) + .eq("snippet", snippet_id) + .execute() + ) return response.data[0] else: # If not, insert the new embedding - response = self.client.table("snippet_embeddings").insert({ - "snippet": snippet_id, - "snippet_document": snippet_document, - "document_token_count": document_token_count, - "embedding": embedding, - "model_name": model_name, - "status": status, - "error_message": error_message, - }).execute() + response = ( + self.client.table("snippet_embeddings") + .insert( + { + "snippet": snippet_id, + "snippet_document": snippet_document, + "document_token_count": document_token_count, + "embedding": embedding, + "model_name": model_name, + "status": status, + "error_message": error_message, + } + ) + .execute() + ) return response.data[0] def delete_vector_embedding_of_snippet(self, snippet_id): - response = self.client.table("snippet_embeddings").delete().eq("snippet", snippet_id).execute() + response = ( + self.client.table("snippet_embeddings") + .delete() + .eq("snippet", snippet_id) + .execute() + ) return response.data # Knowledge Base methods @@ -480,15 +567,32 @@ def search_kb_entries( response = self.client.rpc("search_kb_entries", params).execute() return response.data if response.data else [] - def find_duplicate_kb_entries(self, query_embedding, similarity_threshold=0.92, max_results=5): - response = self.client.rpc("find_duplicate_kb_entries", { - "query_embedding": query_embedding, - "similarity_threshold": similarity_threshold, - "max_results": max_results, - }).execute() + def find_duplicate_kb_entries( + self, query_embedding, similarity_threshold=0.92, max_results=5 + ): + response = self.client.rpc( + "find_duplicate_kb_entries", + { + "query_embedding": query_embedding, + "similarity_threshold": similarity_threshold, + "max_results": max_results, + }, + ).execute() return response.data if response.data else [] - def insert_kb_entry(self, fact, confidence_score, disinformation_categories=None, keywords=None, related_claim=None, valid_from=None, valid_until=None, is_time_sensitive=False, created_by_snippet=None, created_by_model=None): + def insert_kb_entry( + self, + fact, + confidence_score, + disinformation_categories=None, + keywords=None, + related_claim=None, + valid_from=None, + valid_until=None, + is_time_sensitive=False, + created_by_snippet=None, + created_by_model=None, + ): data = { "fact": fact, "confidence_score": confidence_score, @@ -538,10 +642,12 @@ def supersede_kb_entry(self, old_entry_id, new_entry_data): ) # Update old entry: superseded - self.client.table("kb_entries").update({ - "status": "superseded", - "superseded_by": new_entry["id"], - }).eq("id", old_entry_id).execute() + self.client.table("kb_entries").update( + { + "status": "superseded", + "superseded_by": new_entry["id"], + } + ).eq("id", old_entry_id).execute() # Delete old embedding self.delete_kb_entry_embedding(old_entry_id) @@ -549,23 +655,47 @@ def supersede_kb_entry(self, old_entry_id, new_entry_data): return new_entry def deactivate_kb_entry(self, entry_id, reason): - response = self.client.table("kb_entries").update({ - "status": "deactivated", - "deactivation_reason": reason, - }).eq("id", entry_id).execute() + response = ( + self.client.table("kb_entries") + .update( + { + "status": "deactivated", + "deactivation_reason": reason, + } + ) + .eq("id", entry_id) + .execute() + ) # Delete embedding so it no longer appears in RAG queries self.delete_kb_entry_embedding(entry_id) return response.data[0] if response.data else None def get_kb_entry_by_id(self, entry_id): - response = self.client.table("kb_entries").select("*").eq("id", entry_id).execute() + response = ( + self.client.table("kb_entries").select("*").eq("id", entry_id).execute() + ) return response.data[0] if response.data else None def get_kb_entry_sources(self, kb_entry_id): - response = self.client.table("kb_entry_sources").select("*").eq("kb_entry", kb_entry_id).execute() + response = ( + self.client.table("kb_entry_sources") + .select("*") + .eq("kb_entry", kb_entry_id) + .execute() + ) return response.data if response.data else [] - def insert_kb_entry_source(self, kb_entry_id, url, source_name, source_type, title=None, relevant_excerpt=None, publication_date=None, relevance_to_claim="provides_context"): + def insert_kb_entry_source( + self, + kb_entry_id, + url, + source_name, + source_type, + title=None, + relevant_excerpt=None, + publication_date=None, + relevance_to_claim="provides_context", + ): data = { "kb_entry": kb_entry_id, "url": url, @@ -582,8 +712,20 @@ def insert_kb_entry_source(self, kb_entry_id, url, source_name, source_type, tit response = self.client.table("kb_entry_sources").insert(data).execute() return response.data[0] - def upsert_kb_entry_embedding(self, kb_entry_id, embedded_document, document_token_count, embedding, model_name): - existing = self.client.table("kb_entry_embeddings").select("id").eq("kb_entry", kb_entry_id).execute() + def upsert_kb_entry_embedding( + self, + kb_entry_id, + embedded_document, + document_token_count, + embedding, + model_name, + ): + existing = ( + self.client.table("kb_entry_embeddings") + .select("id") + .eq("kb_entry", kb_entry_id) + .execute() + ) data = { "embedded_document": embedded_document, "document_token_count": document_token_count, @@ -593,17 +735,29 @@ def upsert_kb_entry_embedding(self, kb_entry_id, embedded_document, document_tok "error_message": None, } if existing.data: - response = self.client.table("kb_entry_embeddings").update(data).eq("kb_entry", kb_entry_id).execute() + response = ( + self.client.table("kb_entry_embeddings") + .update(data) + .eq("kb_entry", kb_entry_id) + .execute() + ) else: data["kb_entry"] = kb_entry_id response = self.client.table("kb_entry_embeddings").insert(data).execute() return response.data[0] def delete_kb_entry_embedding(self, kb_entry_id): - response = self.client.table("kb_entry_embeddings").delete().eq("kb_entry", kb_entry_id).execute() + response = ( + self.client.table("kb_entry_embeddings") + .delete() + .eq("kb_entry", kb_entry_id) + .execute() + ) return response.data - def record_kb_usage(self, kb_entry_id, snippet_id, usage_type, similarity_score=None): + def record_kb_usage( + self, kb_entry_id, snippet_id, usage_type, similarity_score=None + ): data = { "kb_entry": kb_entry_id, "snippet": snippet_id, @@ -621,7 +775,81 @@ def record_kb_usage(self, kb_entry_id, snippet_id, usage_type, similarity_score= .execute() ) if existing.data: - response = self.client.table("kb_entry_snippet_usage").update(data).eq("id", existing.data[0]["id"]).execute() + response = ( + self.client.table("kb_entry_snippet_usage") + .update(data) + .eq("id", existing.data[0]["id"]) + .execute() + ) else: - response = self.client.table("kb_entry_snippet_usage").insert(data).execute() + response = ( + self.client.table("kb_entry_snippet_usage").insert(data).execute() + ) return response.data[0] + + # ── Downvote Review Queue ───────────────────────────────────────── + + def get_pending_downvote_reviews(self, limit=10): + """Fetch pending downvote review queue entries.""" + response = ( + self.client.table("downvote_review_queue") + .select("*") + .eq("status", "pending") + .order("created_at") + .limit(limit) + .execute() + ) + return response.data if response.data else [] + + def claim_downvote_review(self, queue_id): + """Atomically claim a downvote review entry for processing.""" + response = ( + self.client.table("downvote_review_queue") + .update({"status": "processing"}) + .eq("id", queue_id) + .eq("status", "pending") + .execute() + ) + return response.data[0] if response.data else None + + def complete_downvote_review(self, queue_id, kb_entries_created=0): + """Mark a downvote review as completed.""" + from datetime import datetime, timezone + + response = ( + self.client.table("downvote_review_queue") + .update( + { + "status": "completed", + "processed_at": datetime.now(timezone.utc).isoformat(), + "kb_entries_created": kb_entries_created, + } + ) + .eq("id", queue_id) + .execute() + ) + return response.data[0] if response.data else None + + def fail_downvote_review(self, queue_id, error_message): + """Mark a downvote review as failed.""" + response = ( + self.client.table("downvote_review_queue") + .update( + { + "status": "error", + "error_message": error_message, + } + ) + .eq("id", queue_id) + .execute() + ) + return response.data[0] if response.data else None + + def hide_snippet_by_system(self, snippet_id): + """Hide a snippet programmatically (no auth/admin check).""" + response = ( + self.client.table("user_hide_snippets") + .upsert({"snippet": snippet_id}, on_conflict="snippet") + .execute() + ) + return response.data[0] if response.data else None diff --git a/supabase/database/sql/create_downvote_review_queue.sql b/supabase/database/sql/create_downvote_review_queue.sql new file mode 100644 index 0000000..ec63c06 --- /dev/null +++ b/supabase/database/sql/create_downvote_review_queue.sql @@ -0,0 +1,33 @@ +-- Downvote Review Queue +-- Tracks downvoted snippets awaiting KB review. +-- When a user downvotes a snippet (value=-1 in user_like_snippets), +-- a trigger queues it here for automated review and KB entry creation. + +CREATE TABLE IF NOT EXISTS public.downvote_review_queue ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + snippet_id UUID NOT NULL REFERENCES public.snippets(id) ON DELETE CASCADE, + status TEXT NOT NULL DEFAULT 'pending' + CHECK (status IN ('pending', 'processing', 'completed', 'error')), + downvoted_by UUID, + downvoted_at TIMESTAMPTZ DEFAULT now(), + processed_at TIMESTAMPTZ, + kb_entries_created INTEGER DEFAULT 0, + error_message TEXT, + created_at TIMESTAMPTZ DEFAULT now(), + CONSTRAINT unique_snippet_in_queue UNIQUE (snippet_id) +); + +CREATE INDEX IF NOT EXISTS idx_downvote_review_queue_status + ON public.downvote_review_queue(status); + +-- RLS +ALTER TABLE public.downvote_review_queue ENABLE ROW LEVEL SECURITY; + +GRANT ALL ON TABLE public.downvote_review_queue TO service_role; +GRANT SELECT ON TABLE public.downvote_review_queue TO authenticated; + +CREATE POLICY "Enable full access for service role" + ON public.downvote_review_queue FOR ALL TO service_role USING (true); + +CREATE POLICY "Enable read access for authenticated users" + ON public.downvote_review_queue FOR SELECT TO authenticated USING (true); diff --git a/supabase/database/sql/on_downvote_queue_review.sql b/supabase/database/sql/on_downvote_queue_review.sql new file mode 100644 index 0000000..73bb14a --- /dev/null +++ b/supabase/database/sql/on_downvote_queue_review.sql @@ -0,0 +1,28 @@ +-- Trigger: On downvote, immediately hide the snippet and queue a KB review. +-- Fires on INSERT into user_like_snippets when value = -1. +-- This replaces the old behavior of waiting for 2 downvotes to hide. + +CREATE OR REPLACE FUNCTION on_downvote_queue_review() +RETURNS TRIGGER AS $$ +BEGIN + -- Only process downvotes (value = -1) + IF NEW.value = -1 THEN + -- Immediately hide the snippet + INSERT INTO user_hide_snippets (snippet) + VALUES (NEW.snippet) + ON CONFLICT (snippet) DO NOTHING; + + -- Queue for KB review (UNIQUE constraint prevents duplicates) + INSERT INTO downvote_review_queue (snippet_id, downvoted_by, downvoted_at) + VALUES (NEW.snippet, NEW."user", now()) + ON CONFLICT (snippet_id) DO NOTHING; + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER on_downvote_queue_review_trigger +AFTER INSERT ON user_like_snippets +FOR EACH ROW +EXECUTE FUNCTION on_downvote_queue_review();