Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions .claude/commands/downvote-review.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Downvote Review

Review downvoted snippets, hide them, create corrective KB entries, and generate a report.

## Steps

### 1. Find unreviewed downvoted snippets

Use the Supabase MCP to query for downvoted snippets that haven't been processed yet:

```sql
SELECT
s.id AS snippet_id,
s.title,
s.explanation,
s.disinformation_categories,
s.confidence_scores,
s.created_at,
drq.status AS queue_status,
COUNT(uls.id) FILTER (WHERE uls.value = -1) AS downvote_count
FROM snippets s
JOIN user_like_snippets uls ON uls.snippet = s.id
LEFT JOIN downvote_review_queue drq ON drq.snippet_id = s.id
WHERE uls.value = -1
AND (drq.status IS NULL OR drq.status = 'pending' OR drq.status = 'error')
AND NOT EXISTS (
SELECT 1 FROM user_hide_snippets uhs WHERE uhs.snippet = s.id
)
Comment on lines +26 to +28
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The condition NOT EXISTS (SELECT 1 FROM user_hide_snippets ...) seems to contradict the new trigger's behavior. The on_downvote_queue_review trigger immediately hides a snippet upon a downvote by inserting it into user_hide_snippets. Consequently, this query will not find any snippets that are downvoted after the trigger is active, potentially rendering this part of the on-demand skill ineffective for new downvotes.

To ensure this command can process all relevant downvoted snippets, including those already hidden by the trigger, I recommend removing this NOT EXISTS clause. The subsequent "Hide snippets" step already uses ON CONFLICT DO NOTHING, which makes it safe to run even for snippets that are already hidden.

Comment on lines +26 to +28
GROUP BY s.id, drq.status
ORDER BY s.created_at DESC;
Comment on lines +24 to +30
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This query filters out the very rows the new trigger creates.

The new trigger hides snippets on the first downvote, so AND NOT EXISTS (SELECT 1 FROM user_hide_snippets ...) removes every freshly queued item. This command will report “no unreviewed snippets” even while downvote_review_queue still has pending work.

Suggested fix
-SELECT
-    s.id AS snippet_id,
-    s.title,
-    s.explanation,
-    s.disinformation_categories,
-    s.confidence_scores,
-    s.created_at,
-    drq.status AS queue_status,
-    COUNT(uls.id) FILTER (WHERE uls.value = -1) AS downvote_count
-FROM snippets s
-JOIN user_like_snippets uls ON uls.snippet = s.id
-LEFT JOIN downvote_review_queue drq ON drq.snippet_id = s.id
-WHERE uls.value = -1
-AND (drq.status IS NULL OR drq.status = 'pending' OR drq.status = 'error')
-AND NOT EXISTS (
-    SELECT 1 FROM user_hide_snippets uhs WHERE uhs.snippet = s.id
-)
-GROUP BY s.id, drq.status
-ORDER BY s.created_at DESC;
+SELECT
+    s.id AS snippet_id,
+    s.title,
+    s.explanation,
+    s.disinformation_categories,
+    s.confidence_scores,
+    s.created_at,
+    drq.status AS queue_status,
+    drq.downvoted_at
+FROM downvote_review_queue drq
+JOIN snippets s ON s.id = drq.snippet_id
+WHERE drq.status IN ('pending', 'error')
+ORDER BY drq.downvoted_at DESC;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.claude/commands/downvote-review.md around lines 24 - 30, The WHERE clause
currently excludes rows present in user_hide_snippets, which filters out items
newly queued by the trigger; update the query that references tables/aliases
uls, drq, s and the user_hide_snippets subquery so that snippets with a
corresponding downvote_review_queue record (drq) are not excluded — e.g., remove
the NOT EXISTS(...) filter or change it to allow rows when drq.status IS NOT
NULL/IN ('pending','error') or when a drq.id exists, ensuring the query returns
items in downvote_review_queue while still excluding other hidden snippets.

```

If no results, check for completed reviews too:
```sql
SELECT status, COUNT(*) FROM downvote_review_queue GROUP BY status;
```

Report findings to the user. If no unreviewed snippets exist, say so and stop.

### 2. Group by theme

Analyze the snippet titles and categories to group them into thematic clusters. Present the groups to the user for review.

### 3. Hide snippets

For any unhidden snippets, insert into user_hide_snippets:
```sql
INSERT INTO user_hide_snippets (snippet)
VALUES ('<snippet_id>')
ON CONFLICT (snippet) DO NOTHING;
```

Also insert into the review queue:
```sql
INSERT INTO downvote_review_queue (snippet_id, downvoted_at)
VALUES ('<snippet_id>', now())
ON CONFLICT (snippet_id) DO NOTHING;
```

### 4. Research and create KB entries

For each thematic group:
1. Use subagents to research the correct facts via web search
2. Find authoritative sources (Reuters, AP, NPR, official gov sites, fact-checkers)
3. Check existing KB entries to avoid duplicates:
```sql
SELECT id, fact FROM kb_entries WHERE status = 'active' AND fact ILIKE '%<keyword>%';
```
4. Insert new KB entries with sources using CTEs:
```sql
WITH new_entry AS (
INSERT INTO kb_entries (fact, related_claim, confidence_score,
disinformation_categories, keywords, is_time_sensitive,
valid_from, valid_until, created_by_model, status)
VALUES (...) RETURNING id
)
INSERT INTO kb_entry_sources (kb_entry, url, source_name, source_type,
relevant_excerpt, access_date)
SELECT id, ..., CURRENT_DATE FROM new_entry;
```
5. Use `created_by_model = 'claude-downvote-review'` for traceability

### 5. Verify entries

Launch verification subagents to fact-check each new KB entry against live web sources. Fix any inaccuracies found.

### 6. Generate embeddings

Run the backfill script:
```bash
source .venv/bin/activate && python -m src.scripts.backfill_kb_embeddings
```

### 7. Update queue status

Mark all processed snippets as completed:
```sql
UPDATE downvote_review_queue
SET status = 'completed', processed_at = now(), kb_entries_created = <count>
WHERE snippet_id IN ('<id1>', '<id2>', ...);
```

### 8. Generate report

Create a markdown report at `reports/<date>_downvote_review.md` with:
- Executive summary (snippets found, hidden, KB entries created)
- Grouped snippet analysis
- KB entries created with sources
- Verification results and corrections
- Database changes summary

Commit and push the report.

### 9. Post to Slack

Post a summary to #verdad channel (ID: C07JYU3729G) using the Slack MCP tools, linking to the GitHub report.

## Notes

- The VERDAD Supabase project ID is `dzujjhzgzguciwryzwlx`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Hardcoding the Supabase project ID here is a security and maintainability concern. While project IDs are not typically as sensitive as API keys, it's better practice to avoid hardcoding such values. If this command is executed in an environment where environment variables are available, it would be more secure to fetch this ID from an environment variable. This would also make it easier to point the skill to different environments (e.g., staging, production) without changing the command definition.

Suggested change
- The VERDAD Supabase project ID is `dzujjhzgzguciwryzwlx`
- The VERDAD Supabase project ID is `${VERDAD_SUPABASE_PROJECT_ID}`

- Valid source_type values: tier1_wire_service, tier1_factchecker, tier2_major_news, tier3_regional_news, official_source, other
- KB entries need confidence >= 70 and at least one external source
- Include Spanish-language keywords since the pipeline analyzes Spanish radio
- Always check existing KB entries before creating new ones to avoid duplicates
7 changes: 7 additions & 0 deletions fly.processing_worker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ primary_region = 'sea'
undo_audio_clipping = ''
analysis_review = ''
analysis_review_2 = ''
downvote_review = ''
embedding = ''

[[vm]]
Expand All @@ -39,6 +40,12 @@ primary_region = 'sea'
cpu_kind = 'shared'
cpus = 8

[[vm]]
processes = ["downvote_review"]
memory = '2gb'
cpu_kind = 'shared'
cpus = 4

[[vm]]
processes = ["regenerate_timestamped_transcript", "embedding"]
memory = '1gb'
Expand Down
21 changes: 18 additions & 3 deletions src/processing_pipeline/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
from dotenv import load_dotenv
from prefect import serve
import sentry_sdk
from processing_pipeline.stage_1 import initial_disinformation_detection, redo_main_detection, regenerate_timestamped_transcript, undo_disinformation_detection
from processing_pipeline.stage_1 import (
initial_disinformation_detection,
redo_main_detection,
regenerate_timestamped_transcript,
undo_disinformation_detection,
)
from processing_pipeline.stage_2 import audio_clipping, undo_audio_clipping
from processing_pipeline.stage_3 import in_depth_analysis
from processing_pipeline.stage_5 import embedding
from processing_pipeline.stage_4 import analysis_review
from processing_pipeline.stage_4 import analysis_review, downvote_review # noqa: F401

load_dotenv()

# Setup Sentry
Expand Down Expand Up @@ -51,7 +57,9 @@
deployment = audio_clipping.to_deployment(
name="Stage 2: Audio Clipping",
concurrency_limit=100,
parameters=dict(context_before_seconds=90, context_after_seconds=60, repeat=True),
parameters=dict(
context_before_seconds=90, context_after_seconds=60, repeat=True
),
)
serve(deployment, limit=100)
case "undo_audio_clipping":
Expand Down Expand Up @@ -81,6 +89,13 @@
parameters=dict(snippet_ids=[], repeat=True),
)
serve(deployment, limit=100)
case "downvote_review":
deployment = downvote_review.to_deployment(
name="Stage 4: Downvote Review",
concurrency_limit=1,
parameters=dict(repeat=True),
)
serve(deployment, limit=1)
case "embedding":
deployment = embedding.to_deployment(
name="Stage 5: Embedding",
Expand Down
3 changes: 2 additions & 1 deletion src/processing_pipeline/stage_4/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .executor import Stage4Executor
from .flows import analysis_review
from .downvote_flows import downvote_review

__all__ = ["Stage4Executor", "analysis_review"]
__all__ = ["Stage4Executor", "analysis_review", "downvote_review"]
111 changes: 111 additions & 0 deletions src/processing_pipeline/stage_4/downvote_flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import asyncio
import os

from prefect.task_runners import ConcurrentTaskRunner

from processing_pipeline.constants import PromptStage
from processing_pipeline.stage_4.constants import Stage4SubStage
from processing_pipeline.stage_4.tasks import (
fetch_a_specific_snippet_from_supabase,
process_snippet,
)
from processing_pipeline.supabase_utils import SupabaseClient
from utils import optional_flow


@optional_flow(
name="Stage 4: Downvote Review",
log_prints=True,
task_runner=ConcurrentTaskRunner,
)
async def downvote_review(repeat=True):
"""Process downvoted snippets through the Stage 4 KB review pipeline.

Polls the downvote_review_queue table for pending entries. For each:
1. Claims the entry (atomic status update to prevent double-processing)
2. Ensures the snippet is hidden
3. Runs the full Stage 4 review pipeline (reviewer + KB researcher +
web researcher + KB updater agents)
4. Marks the queue entry as completed or errored
"""
os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Modifying os.environ at runtime is generally considered an unsafe practice. It creates a global side effect that can lead to unpredictable behavior in other parts of the application that might rely on the GOOGLE_API_KEY environment variable. It also makes the code harder to reason about and debug.

A safer approach would be to pass the API key explicitly to the client or function that requires it, rather than altering the environment. This would make the dependency clear and avoid unintended consequences.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard the Google key before writing to os.environ.

os.environ["GOOGLE_API_KEY"] = None raises TypeError, so a missing GOOGLE_GEMINI_PAID_KEY crashes the worker before it can emit a useful error.

Suggested fix
-    os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
+    google_api_key = os.getenv("GOOGLE_GEMINI_PAID_KEY")
+    if not google_api_key:
+        raise RuntimeError("GOOGLE_GEMINI_PAID_KEY is not set")
+    os.environ["GOOGLE_API_KEY"] = google_api_key
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_GEMINI_PAID_KEY")
google_api_key = os.getenv("GOOGLE_GEMINI_PAID_KEY")
if not google_api_key:
raise RuntimeError("GOOGLE_GEMINI_PAID_KEY is not set")
os.environ["GOOGLE_API_KEY"] = google_api_key
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/processing_pipeline/stage_4/downvote_flows.py` at line 31, The assignment
to os.environ["GOOGLE_API_KEY"] should be guarded against a missing
GOOGLE_GEMINI_PAID_KEY to avoid assigning None; check the value returned by
os.environ.get("GOOGLE_GEMINI_PAID_KEY") first and only set
os.environ["GOOGLE_API_KEY"] when it's truthy, otherwise emit a clear error via
the module's logger (or raise a descriptive exception) so the worker fails with
an actionable message; locate the assignment to os.environ["GOOGLE_API_KEY"] in
downvote_flows.py and replace it with a conditional guard around the
os.environ.get("GOOGLE_GEMINI_PAID_KEY") lookup.


supabase_client = SupabaseClient(
supabase_url=os.getenv("SUPABASE_URL"),
supabase_key=os.getenv("SUPABASE_KEY"),
)

prompt_versions = {
"kb_researcher": supabase_client.get_active_prompt(
PromptStage.STAGE_4, Stage4SubStage.KB_RESEARCHER
),
"web_researcher": supabase_client.get_active_prompt(
PromptStage.STAGE_4, Stage4SubStage.WEB_RESEARCHER
),
"reviewer": supabase_client.get_active_prompt(
PromptStage.STAGE_4, Stage4SubStage.REVIEWER
),
"kb_updater": supabase_client.get_active_prompt(
PromptStage.STAGE_4, Stage4SubStage.KB_UPDATER
),
}

while True:
pending = supabase_client.get_pending_downvote_reviews(limit=1)
if not pending:
if not repeat:
print("No pending downvote reviews. Exiting.")
break
print("No pending downvote reviews. Sleeping 30s...")
await asyncio.sleep(30)
continue

queue_entry = pending[0]
claimed = supabase_client.claim_downvote_review(queue_entry["id"])
if not claimed:
print(f"Queue entry {queue_entry['id']} already claimed. Skipping.")
continue
Comment on lines +53 to +67

snippet_id = queue_entry["snippet_id"]
print(f"Processing downvoted snippet: {snippet_id}")

snippet = fetch_a_specific_snippet_from_supabase(supabase_client, snippet_id)
if not snippet:
supabase_client.fail_downvote_review(queue_entry["id"], "Snippet not found")
continue

try:
supabase_client.hide_snippet_by_system(snippet_id)

# Prepend downvote context so the reviewer agents understand
# this snippet was flagged as a false positive by users
if snippet.get("context") and snippet["context"].get("main"):
downvote_prefix = (
"[DOWNVOTE REVIEW CONTEXT] This snippet was downvoted by users "
"as a FALSE POSITIVE — the content likely reports real events that "
"were incorrectly flagged as disinformation. Focus on researching "
"the correct facts and creating KB entries to prevent similar false "
"positives in the future.\n\n"
)
snippet["context"]["main"] = (
downvote_prefix + snippet["context"]["main"]
)

await process_snippet(supabase_client, snippet, prompt_versions)
supabase_client.complete_downvote_review(
queue_entry["id"], kb_entries_created=1
Comment on lines +94 to +96
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Fail queue items when Stage 4 processing fails

downvote_review marks the queue entry completed immediately after process_snippet, but process_snippet catches its own exceptions and does not re-raise (it only sets snippet status to Error). That means failed reviews still get status='completed' here and are never retried, causing silent data loss in the downvote review queue.

Useful? React with 👍 / 👎.

)
Comment on lines +95 to +97
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The number of created knowledge base entries is hardcoded to 1. However, the process_snippet agentic pipeline might create zero, one, or multiple KB entries. This hardcoded value will result in inaccurate data being stored in the downvote_review_queue table.

To fix this, the process_snippet function (and its underlying Stage4Executor) should return the actual count of KB entries created during its execution. This count can then be passed to complete_downvote_review.

Suggested change
supabase_client.complete_downvote_review(
queue_entry["id"], kb_entries_created=1
)
kb_entries_created_count = await process_snippet(supabase_client, snippet, prompt_versions)
supabase_client.complete_downvote_review(
queue_entry["id"], kb_entries_created=kb_entries_created_count
)

Comment on lines +94 to +97
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't mark the queue item completed unconditionally.

process_snippet() catches its own failures in src/processing_pipeline/stage_4/tasks.py:104-150 and returns no result, so this branch also runs when Stage 4 actually failed. That both hides failed work under completed and hardcodes kb_entries_created=1 regardless of whether zero, one, or many KB entries were written.

Suggested direction
-            await process_snippet(supabase_client, snippet, prompt_versions)
-            supabase_client.complete_downvote_review(
-                queue_entry["id"], kb_entries_created=1
-            )
+            result = await process_snippet(supabase_client, snippet, prompt_versions)
+            if not result["success"]:
+                supabase_client.fail_downvote_review(queue_entry["id"], result["error"])
+                continue
+            supabase_client.complete_downvote_review(
+                queue_entry["id"],
+                kb_entries_created=result["kb_entries_created"],
+            )

This needs a matching change in src/processing_pipeline/stage_4/tasks.py so process_snippet() returns a structured outcome instead of swallowing errors.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/processing_pipeline/stage_4/downvote_flows.py` around lines 94 - 97,
process_snippet currently swallows errors and returns nothing, but
downvote_flows calls supabase_client.complete_downvote_review unconditionally
with kb_entries_created=1; change process_snippet (in
src/processing_pipeline/stage_4/tasks.py) to return a structured outcome object
(e.g. { success: bool, kb_entries_created: int, error?: Error }) instead of
swallowing failures, then update the caller in downvote_flows.py to check that
outcome: only call supabase_client.complete_downvote_review(queue_entry["id"],
kb_entries_created=outcome.kb_entries_created) when outcome.success is true (or
otherwise call a failure path such as
supabase_client.mark_downvote_review_failed with the error info); use the
symbols process_snippet and supabase_client.complete_downvote_review to locate
where to change behavior.

print(f"Downvote review completed for snippet {snippet_id}")

except Exception as e:
error_msg = str(e)
if isinstance(e, ExceptionGroup):
error_msg = "\n".join(
f"- {type(exc).__name__}: {exc}" for exc in e.exceptions
)
print(f"Downvote review failed for snippet {snippet_id}: {error_msg}")
supabase_client.fail_downvote_review(queue_entry["id"], error_msg)

if not repeat:
break
await asyncio.sleep(2)
Loading