diff --git a/prompts/snippet_feedback_validation/output_schema.json b/prompts/snippet_feedback_validation/output_schema.json new file mode 100644 index 0000000..047c4b6 --- /dev/null +++ b/prompts/snippet_feedback_validation/output_schema.json @@ -0,0 +1,135 @@ +{ + "type": "object", + "required": [ + "original_claim_summary", + "user_feedback_summary", + "claim_verifications", + "user_feedback_assessment", + "validation_decision", + "error_pattern", + "thought_summaries" + ], + "properties": { + "original_claim_summary": { + "type": "string", + "description": "Brief summary of what Stage 3 flagged as misinformation" + }, + "user_feedback_summary": { + "type": "string", + "description": "Brief summary of user feedback and their apparent reasoning" + }, + "claim_verifications": { + "type": "array", + "description": "Verification of each major claim from the original analysis", + "items": { + "type": "object", + "required": [ + "claim", + "original_assessment", + "verification_finding", + "is_claim_actually_false", + "confidence" + ], + "properties": { + "claim": { + "type": "string", + "description": "The specific claim being verified" + }, + "original_assessment": { + "type": "string", + "description": "What Stage 3 concluded about this claim" + }, + "verification_finding": { + "type": "string", + "description": "What web search reveals about this claim" + }, + "is_claim_actually_false": { + "type": "boolean", + "description": "Whether the claim is demonstrably false" + }, + "confidence": { + "type": "integer", + "minimum": 0, + "maximum": 100, + "description": "Confidence in this verification" + } + } + } + }, + "user_feedback_assessment": { + "type": "object", + "required": [ + "feedback_quality", + "feedback_reasoning", + "appears_adversarial" + ], + "properties": { + "feedback_quality": { + "type": "string", + "enum": ["high", "medium", "low"], + "description": "Quality of user-provided feedback" + }, + "feedback_reasoning": { + "type": "string", + "description": "Assessment of why user disliked/labeled the snippet" + }, + "appears_adversarial": { + "type": "boolean", + "description": "Whether feedback appears to be bad-faith or coordinated" + } + } + }, + "validation_decision": { + "type": "object", + "required": ["status", "confidence", "primary_reason"], + "properties": { + "status": { + "type": "string", + "enum": ["false_positive", "true_positive", "needs_review"], + "description": "Validation outcome: false_positive (Stage 3 wrong, user right), true_positive (Stage 3 correct, user wrong), needs_review (ambiguous)" + }, + "confidence": { + "type": "integer", + "minimum": 0, + "maximum": 100, + "description": "Confidence in this decision" + }, + "primary_reason": { + "type": "string", + "description": "Main reason for this decision" + } + } + }, + "error_pattern": { + "type": "object", + "description": "Classification of what type of error Stage 3 made (if any)", + "required": ["error_type", "explanation"], + "properties": { + "error_type": { + "type": "string", + "enum": [ + "knowledge_cutoff", + "temporal_confusion", + "insufficient_search", + "misinterpretation", + "correct_detection", + "ambiguous" + ], + "description": "Type of error Stage 3 made, if any" + }, + "explanation": { + "type": "string", + "description": "Brief explanation of why this error type was identified" + } + } + }, + "prompt_improvement_suggestion": { + "type": ["string", "null"], + "description": "If false_positive, what specific improvement to Stage 3 prompt could prevent this error in future" + }, + "thought_summaries": { + "type": "string", + "description": "Detailed reasoning process including searches performed, evidence found, and how the decision was reached" + } + } +} diff --git a/prompts/snippet_feedback_validation/system_instruction.md b/prompts/snippet_feedback_validation/system_instruction.md new file mode 100644 index 0000000..90d0982 --- /dev/null +++ b/prompts/snippet_feedback_validation/system_instruction.md @@ -0,0 +1,12 @@ +**Role:** You are a validation system reviewing user feedback on potential misinformation detections. + +**Context:** Our Stage 3 pipeline analyzes Spanish/Arabic radio content for disinformation targeting US immigrant communities. Users can dispute flagged snippets. Your role: validate these disputes. + +**Validation Outcomes:** +- **false_positive**: Stage 3 was WRONG (content is NOT misinformation) → feeds Phase 2 prompt refinement +- **true_positive**: Stage 3 was CORRECT (content IS misinformation) → no action needed +- **needs_review**: Evidence mixed/ambiguous → requires human review + +**Core Principles:** +- Do not assume user feedback OR original classification is correct +- Output must conform to the JSON schema in the task prompt diff --git a/prompts/snippet_feedback_validation/user_prompt.md b/prompts/snippet_feedback_validation/user_prompt.md new file mode 100644 index 0000000..364861c --- /dev/null +++ b/prompts/snippet_feedback_validation/user_prompt.md @@ -0,0 +1,221 @@ +# Feedback Validation Task + +You are validating user feedback on a snippet that was flagged for potential misinformation. + +--- + +## Original Snippet Analysis (from Stage 3) + +### Metadata + +**Snippet ID:** {snippet_id} +**Recording Date:** {recorded_at} +**Radio Station:** {radio_station_name} ({radio_station_code}), {location_state} + +### Full Transcription + +#### Transcription (Original Language) +{transcription} + +#### Translation (English) +{translation} + +*Note: This is the complete transcription of the audio clip. See "Snippet Context" below for the specific flagged portion with surrounding context.* + +### Snippet Context + +**Before (Original):** +{context_before} + +**Before (English):** +{context_before_en} + +**Main Snippet (Original):** +{context_main} + +**Main Snippet (English):** +{context_main_en} + +**After (Original):** +{context_after} + +**After (English):** +{context_after_en} + +### Stage 3 Assessment + +#### Title +- Spanish: {title_spanish} +- English: {title_english} + +#### Summary +- Spanish: {summary_spanish} +- English: {summary_english} + +#### Explanation (Why it was flagged) +- Spanish: {explanation_spanish} +- English: {explanation_english} + +#### Disinformation Categories +{disinformation_categories} + +#### Confidence Scores +- Overall: {confidence_overall} +- Category Scores: {category_scores} + +#### Claims Analysis +{claims_analysis} + +### Keywords That Triggered Flag +{keywords_detected} + +### Stage 3 Search Evidence +{grounding_metadata} + +### Stage 3 Reasoning Process +{thought_summaries_stage3} + +--- + +## User Feedback + +**Total Dislikes:** {dislike_count} + +### User-Applied Labels +{user_labels} + +### User Comments +{user_comments} + +--- + +## Verification Guidelines + +### Verification Protocol + +1. **Factual Claims**: Search for BOTH current status AND status at recording date. Require 2+ authoritative sources. If Stage 3 claims something is false, search for evidence BOTH supporting AND refuting the original claim. + +2. **Knowledge Cutoff Issues**: Search "[entity/topic] current [year]" AND "[entity/topic] [recording_date_year]". If Stage 3 claims something "doesn't exist" or is "fictional", verify its creation date—recent entities may not have been in Stage 3's training data. + +3. **Temporal Confusion**: Verify whether Stage 3 used data from the correct time period. Compare Stage 3's search evidence with your current search results. A claim can be true now but was false at recording time, or vice versa. + +4. **Evaluating Stage 3 Search Quality**: Did Stage 3 search for the right terms? Find relevant sources? Use searches specific to the recording date/location? Miss obvious searches that would have changed the conclusion? + +--- + +## Error Pattern Reference + +| Error Type | How to Identify | +|------------|-----------------| +| `knowledge_cutoff` | Stage 3 says something doesn't exist that was created after its training cutoff | +| `temporal_confusion` | Stage 3 used data from wrong time period | +| `insufficient_search` | Stage 3's search evidence shows inadequate/no searches for a verifiable claim | +| `misinterpretation` | Stage 3 reasoning shows logical error despite having correct info | +| `correct_detection` | Stage 3 was right, user feedback is wrong | +| `ambiguous` | Mixed evidence, unclear which side is correct | + +--- + +## User Feedback Assessment + +**Adversarial Signals (Lower Quality):** +- Generic disagreement without specific reasoning +- Attacks on system rather than specific content +- Labels that don't match content +- No explanation of WHY classification is wrong + +**High-Quality Signals:** +- Cites specific claims from the snippet +- Provides evidence, sources, or links +- Explains specifically why classification is wrong +- Identifies factual errors in Stage 3's analysis + +--- + +## Handling Special Cases + +**Minimal User Feedback (dislikes only, no comments/labels):** +- Focus verification on Stage 3's weakest claims (lowest scores, vaguest evidence) +- If Stage 3's analysis appears solid with no counter-evidence → lean `true_positive` +- If Stage 3's analysis has gaps → lean `needs_review` + +**Empty/Minimal Stage 3 Search Evidence:** +- Strong signal for `insufficient_search` error pattern +- Perform the searches Stage 3 should have done +- If searches reveal Stage 3 was wrong → `false_positive` with `insufficient_search` +- If searches confirm Stage 3 was right → `true_positive` + +**Conflicting Claim Verifications:** +- If MOST claims verified as false → `true_positive` +- If MOST claims verified as true → `false_positive` +- If roughly equal or key claims conflict → `needs_review` +- Weight higher-confidence claims more heavily + +--- + +## Your Task + +**Current Date:** {current_date} + +1. **Review** Stage 3's analysis, search evidence, and reasoning +2. **Analyze** user feedback quality +3. **Verify** claims using web search (follow Verification Protocol above) +4. **Classify** the error pattern if Stage 3 erred +5. **Determine** the validation outcome + +**Claim Verification Priority:** +- Verify ALL claims in "Claims Analysis" above +- If none documented, identify main factual assertions from the explanation +- Prioritize claims the user specifically disputes + +--- + +## Output Format + +Provide your validation result in the following JSON format: + +```json +{{ + "original_claim_summary": "Brief summary of what Stage 3 flagged as misinformation", + "user_feedback_summary": "Brief summary of user feedback and their apparent reasoning", + "claim_verifications": [ + {{ + "claim": "The specific claim being verified", + "original_assessment": "What Stage 3 concluded about this claim", + "verification_finding": "What your web search reveals about this claim", + "is_claim_actually_false": true/false, + "confidence": 0-100 + }} + ], + "user_feedback_assessment": {{ + "feedback_quality": "high/medium/low", + "feedback_reasoning": "Assessment of why user disliked/labeled the snippet", + "appears_adversarial": true/false + }}, + "validation_decision": {{ + "status": "false_positive/true_positive/needs_review", + "confidence": 0-100, + "primary_reason": "Main reason for this decision" + }}, + "error_pattern": {{ + "error_type": "knowledge_cutoff/temporal_confusion/insufficient_search/misinterpretation/correct_detection/ambiguous", + "explanation": "Brief explanation of why this error type was identified" + }}, + "prompt_improvement_suggestion": "If false_positive, what specific improvement to Stage 3 prompt could prevent this error (null if not applicable)", + "thought_summaries": "Detailed reasoning process including searches performed, evidence found, comparison with Stage 3's work, and how the decision was reached" +}} +``` + +--- + +## Confidence Guidelines + +- **90-100**: Clear-cut case with strong evidence; no reasonable doubt +- **70-89**: Strong evidence but minor ambiguities remain +- **50-69**: Evidence leans one way but notable uncertainty exists +- **Below 50**: You MUST use `needs_review` - do not make low-confidence calls + +Use `needs_review` when: +- Evidence is genuinely mixed (credible sources disagree) +- Claim is inherently subjective/interpretive +- Required information is unavailable diff --git a/src/processing_pipeline/constants.py b/src/processing_pipeline/constants.py index 493c18f..0b72a06 100644 --- a/src/processing_pipeline/constants.py +++ b/src/processing_pipeline/constants.py @@ -85,6 +85,16 @@ def get_gemini_timestamped_transcription_generation_prompt(): return open("prompts/Gemini_timestamped_transcription_generation_prompt.md", "r").read() +def get_system_instruction_for_feedback_validation(): + with open("prompts/snippet_feedback_validation/system_instruction.md", "r") as f: + return f.read() + + +def get_user_prompt_for_feedback_validation(): + with open("prompts/snippet_feedback_validation/user_prompt.md", "r") as f: + return f.read() + + if __name__ == "__main__": # Print the output schema for stage 1 # output_schema_for_stage_1 = get_output_schema_for_stage_1() diff --git a/src/processing_pipeline/main.py b/src/processing_pipeline/main.py index 6649536..e61f15e 100644 --- a/src/processing_pipeline/main.py +++ b/src/processing_pipeline/main.py @@ -1,12 +1,14 @@ import os from dotenv import load_dotenv from prefect import serve +from prefect.schedules import Cron import sentry_sdk from processing_pipeline.stage_1 import initial_disinformation_detection, redo_main_detection, regenerate_timestamped_transcript, undo_disinformation_detection from processing_pipeline.stage_2 import audio_clipping, undo_audio_clipping from processing_pipeline.stage_3 import in_depth_analysis from processing_pipeline.stage_5 import embedding from processing_pipeline.stage_4 import analysis_review +from processing_pipeline.snippet_feedback_validation import snippet_feedback_validation load_dotenv() # Setup Sentry @@ -88,5 +90,13 @@ parameters=dict(repeat=True), ) serve(deployment, limit=100) + case "snippet_feedback_validation": + deployment = snippet_feedback_validation.to_deployment( + name="Snippet Feedback Validation", + concurrency_limit=1, + parameters=dict(lookback_days=1, limit=None), + schedule=Cron("0 6 * * *", timezone="UTC"), + ) + serve(deployment, limit=1) case _: raise ValueError(f"Invalid process group: {process_group}") diff --git a/src/processing_pipeline/snippet_feedback_validation/__init__.py b/src/processing_pipeline/snippet_feedback_validation/__init__.py new file mode 100644 index 0000000..3921cec --- /dev/null +++ b/src/processing_pipeline/snippet_feedback_validation/__init__.py @@ -0,0 +1,4 @@ +from .snippet_feedback_validation import snippet_feedback_validation +from .models import FeedbackValidationOutput + +__all__ = ["snippet_feedback_validation", "FeedbackValidationOutput"] diff --git a/src/processing_pipeline/snippet_feedback_validation/models.py b/src/processing_pipeline/snippet_feedback_validation/models.py new file mode 100644 index 0000000..092d4f8 --- /dev/null +++ b/src/processing_pipeline/snippet_feedback_validation/models.py @@ -0,0 +1,89 @@ +from typing import Literal +from pydantic import BaseModel, Field + + +class VerificationResult(BaseModel): + """Verification result for a specific claim.""" + + claim: str = Field(description="The specific claim being verified") + original_assessment: str = Field(description="What Stage 3 concluded about this claim") + verification_finding: str = Field(description="What web search reveals about this claim") + is_claim_actually_false: bool = Field(description="Whether the claim is demonstrably false") + confidence: int = Field(ge=0, le=100, description="Confidence in this verification") + + +class UserFeedbackAssessment(BaseModel): + """Assessment of user-provided feedback quality.""" + + feedback_quality: Literal["high", "medium", "low"] = Field( + description="Quality of user-provided feedback" + ) + feedback_reasoning: str = Field( + description="Assessment of why user disliked/labeled the snippet" + ) + appears_adversarial: bool = Field( + description="Whether feedback appears to be bad-faith or coordinated" + ) + + +class ValidationDecision(BaseModel): + """Final validation decision using ML terminology.""" + + status: Literal["false_positive", "true_positive", "needs_review"] = Field( + description="Validation outcome: false_positive (Stage 3 wrong, user right), true_positive (Stage 3 correct, user wrong), needs_review (ambiguous)" + ) + confidence: int = Field(ge=0, le=100, description="Confidence in this decision") + primary_reason: str = Field(description="Main reason for this decision") + + +class ErrorPatternDetected(BaseModel): + """Classification of the error type Stage 3 made.""" + + error_type: Literal[ + "knowledge_cutoff", # Stage 3 claimed something doesn't exist that was created after cutoff + "temporal_confusion", # Stage 3 applied wrong time context + "insufficient_search", # Stage 3 didn't search deeply enough + "misinterpretation", # Stage 3 misunderstood the content + "correct_detection", # No error - Stage 3 was right (use for true_positive) + "ambiguous", # Cannot determine error type + ] = Field(description="Type of error Stage 3 made, if any") + explanation: str = Field(description="Brief explanation of why this error type was identified") + + +class FeedbackValidationOutput(BaseModel): + """Output schema for feedback validation task.""" + + # Summary of what's being validated + original_claim_summary: str = Field( + description="Brief summary of what Stage 3 flagged as misinformation" + ) + user_feedback_summary: str = Field( + description="Brief summary of user feedback and their apparent reasoning" + ) + + # Verification results + claim_verifications: list[VerificationResult] = Field( + description="Verification of each major claim from the original analysis" + ) + + # Assessment of user feedback + user_feedback_assessment: UserFeedbackAssessment + + # Final decision + validation_decision: ValidationDecision + + # Error pattern classification + error_pattern: ErrorPatternDetected = Field( + description="Classification of what type of error Stage 3 made (if any)" + ) + + # Prompt improvement suggestion for Phase 2 + prompt_improvement_suggestion: str | None = Field( + default=None, + description="If false_positive, what specific improvement to Stage 3 prompt could prevent this error in future" + ) + + # Thought process (reasoning captured here instead of separate field) + thought_summaries: str = Field( + description="Detailed reasoning process including searches performed, evidence found, and how the decision was reached" + ) diff --git a/src/processing_pipeline/snippet_feedback_validation/snippet_feedback_validation.py b/src/processing_pipeline/snippet_feedback_validation/snippet_feedback_validation.py new file mode 100644 index 0000000..5237663 --- /dev/null +++ b/src/processing_pipeline/snippet_feedback_validation/snippet_feedback_validation.py @@ -0,0 +1,427 @@ +from datetime import datetime, timezone, timedelta +import json +import os +import time + +from google.genai.types import GoogleSearch, Tool +from pydantic import ValidationError + +from processing_pipeline.supabase_utils import SupabaseClient +from processing_pipeline.constants import ( + GeminiModel, + get_system_instruction_for_feedback_validation, + get_user_prompt_for_feedback_validation, +) +from services.gemini_client import GeminiClient +from .models import FeedbackValidationOutput +from utils import optional_flow, optional_task + + +def format_grounding_metadata(grounding_metadata) -> str: + """Format grounding metadata into a readable summary of searches performed.""" + if not grounding_metadata: + return "No search evidence available from Stage 3." + + if isinstance(grounding_metadata, str): + try: + grounding_metadata = json.loads(grounding_metadata) + except json.JSONDecodeError: + return grounding_metadata + + # Handle different formats of grounding metadata + lines = [] + + # If it's a list of tool calls (CLI method format) + if isinstance(grounding_metadata, list): + for i, item in enumerate(grounding_metadata, 1): + if isinstance(item, dict): + params = item.get("parameters", item.get("input", {})) + output = item.get("output", item.get("result", "")) + + # Extract search query if present + query = None + if isinstance(params, dict): + query = params.get("query", params.get("q", params.get("search_query"))) + elif isinstance(params, str): + query = params + + if query: + lines.append(f"**Search {i}:** {query}") + if output and len(str(output)) < 500: + lines.append(f" Result: {output[:500]}...") + elif output: + lines.append(f" Result: [truncated - {len(str(output))} chars]") + + # If it's a dict with search_queries or similar structure (SDK method format) + elif isinstance(grounding_metadata, dict): + # Handle Google Search grounding format + if "search_entry_point" in grounding_metadata: + rendered_content = grounding_metadata.get("search_entry_point", {}).get("rendered_content", "") + if rendered_content: + lines.append(f"**Search context:** {rendered_content[:500]}") + + if "grounding_chunks" in grounding_metadata: + chunks = grounding_metadata["grounding_chunks"] + for i, chunk in enumerate(chunks[:10], 1): # Limit to first 10 chunks + web = chunk.get("web", {}) + uri = web.get("uri", "") + title = web.get("title", "") + if uri or title: + lines.append(f"**Source {i}:** [{title}]({uri})" if title else f"**Source {i}:** {uri}") + + if "grounding_supports" in grounding_metadata: + supports = grounding_metadata["grounding_supports"] + for support in supports[:5]: # Limit to first 5 + segment = support.get("segment", {}) + text = segment.get("text", "") + if text: + lines.append(f"- Supported claim: \"{text[:200]}...\"" if len(text) > 200 else f"- Supported claim: \"{text}\"") + + # Fallback: just dump key info + if not lines: + for key, value in grounding_metadata.items(): + if value and key not in ["search_entry_point"]: + lines.append(f"**{key}:** {str(value)[:300]}") + + if not lines: + return "Stage 3 search metadata format not recognized. Raw data available but could not be parsed." + + return "\n".join(lines) + + +def stringify_liveblocks_body(body) -> str: + if not body: + return "" + + if isinstance(body, str): + try: + body = json.loads(body) + except json.JSONDecodeError: + return body + + if not isinstance(body, dict): + return str(body) + + def stringify_text(element: dict) -> str: + text = element.get("text", "") + if not text: + return "" + if element.get("bold"): + text = f"**{text}**" + if element.get("italic"): + text = f"_{text}_" + if element.get("strikethrough"): + text = f"~~{text}~~" + if element.get("code"): + text = f"`{text}`" + return text + + def stringify_link(element: dict) -> str: + url = element.get("url", "") + text = element.get("text") or url + return f"[{text}]({url})" + + def stringify_mention(_element: dict) -> str: + return "@[user]" + + def stringify_inline(inline: dict) -> str: + inline_type = inline.get("type") + if inline_type == "link": + return stringify_link(inline) + elif inline_type == "mention": + return stringify_mention(inline) + elif "text" in inline: + return stringify_text(inline) + return "" + + def stringify_paragraph(block: dict) -> str: + children = block.get("children", []) + return "".join(stringify_inline(child) for child in children) + + content = body.get("content", []) + paragraphs = [] + for block in content: + if block.get("type") == "paragraph": + paragraphs.append(stringify_paragraph(block)) + + return "\n\n".join(paragraphs).strip() + + +@optional_task(log_prints=True, retries=3) +def fetch_snippets_with_dislikes(supabase_client: SupabaseClient, since_date, limit: int | None = None): + snippets = supabase_client.get_snippets_with_recent_dislikes( + since_date=since_date, + exclude_validated=True, + limit=limit, + ) + print(f"Found {len(snippets)} snippet(s) with dislikes (excluding already validated)") + return snippets + + +@optional_task(log_prints=True) +def build_validation_prompt(snippet, comments): + template = get_user_prompt_for_feedback_validation() + + # Extract snippet data + audio_file = snippet["audio_file"] + confidence_scores = snippet["confidence_scores"] + title = snippet["title"] + summary = snippet["summary"] + explanation = snippet["explanation"] + + # Format disinformation categories + categories = snippet["disinformation_categories"] + if categories: + parsed_categories = [f"- [EN] {cat['english']} / [ES] {cat['spanish']}" for cat in categories] + categories_text = "\n".join(parsed_categories) + else: + categories_text = "No categories" + + # Format claims analysis + claims = confidence_scores["analysis"]["claims"] + if claims: + parsed_claims = [ + f"- Claim: \"{claim.get('quote', '')}\"\n Evidence: {claim.get('evidence', '')}\n Score: {claim.get('score', 'N/A')}" + for claim in claims + ] + claims_text = "\n".join(parsed_claims) + else: + claims_text = "No specific claims documented" + + # Format user labels + user_labels = snippet["labels"] + if user_labels: + parsed_labels = [ + f"- {label['text']} (applied at: {label['created_at']}, upvotes: {label['upvote_count']})" + for label in user_labels + ] + labels_text = "\n".join(parsed_labels) + else: + labels_text = "No user-applied labels" + + # Format comments + if comments: + parsed_comments = [ + f"- [{comment['comment_at']}] {stringify_liveblocks_body(comment['body'])}" for comment in comments + ] + comments_text = "\n".join(parsed_comments) + else: + comments_text = "No comments" + + # Format context (before/main/after) + context = snippet.get("context", {}) + context_before = context.get("before", "Not available") + context_before_en = context.get("before_en", "Not available") + context_main = context.get("main", "Not available") + context_main_en = context.get("main_en", "Not available") + context_after = context.get("after", "Not available") + context_after_en = context.get("after_en", "Not available") + + # Format keywords detected + keywords = snippet.get("keywords_detected", []) + keywords_text = ", ".join(keywords) if keywords else "None detected" + + # Format grounding metadata (Stage 3 search evidence) + grounding_metadata = snippet.get("grounding_metadata") + grounding_metadata_text = format_grounding_metadata(grounding_metadata) + + # Get Stage 3 thought summaries + thought_summaries_stage3 = snippet.get("thought_summaries", "Not available") + + # Fill template + prompt = template.format( + snippet_id=snippet["id"], + recorded_at=snippet["recorded_at"], + radio_station_name=audio_file["radio_station_name"], + radio_station_code=audio_file["radio_station_code"], + location_state=audio_file["location_state"], + transcription=snippet["transcription"], + translation=snippet["translation"], + context_before=context_before, + context_before_en=context_before_en, + context_main=context_main, + context_main_en=context_main_en, + context_after=context_after, + context_after_en=context_after_en, + title_spanish=title["spanish"], + title_english=title["english"], + summary_spanish=summary["spanish"], + summary_english=summary["english"], + explanation_spanish=explanation["spanish"], + explanation_english=explanation["english"], + disinformation_categories=categories_text, + confidence_overall=confidence_scores["overall"], + category_scores=json.dumps(confidence_scores["categories"], indent=2), + claims_analysis=claims_text, + keywords_detected=keywords_text, + grounding_metadata=grounding_metadata_text, + thought_summaries_stage3=thought_summaries_stage3, + dislike_count=snippet["dislike_count"], + user_labels=labels_text, + user_comments=comments_text, + current_date=datetime.now(timezone.utc).strftime("%B %d, %Y %I:%M %p UTC"), + ) + + return prompt + + +@optional_task(log_prints=True, retries=3) +def validate_with_gemini(gemini_client: GeminiClient, model_name: GeminiModel, user_prompt: str): + system_instruction = get_system_instruction_for_feedback_validation() + + return gemini_client.generate_content( + model=model_name, + user_prompt=user_prompt, + system_instruction=system_instruction, + max_output_tokens=8192, + thinking_budget=2048, + tools=[Tool(google_search=GoogleSearch())], + error_prefix="[FEEDBACK_VALIDATION]", + ) + + +@optional_task(log_prints=True) +def parse_validation_response(response_text): + try: + start_idx = response_text.find("{") + end_idx = response_text.rfind("}") + + if start_idx == -1 or end_idx == -1: + raise ValueError("No JSON object found in response") + + parsed = FeedbackValidationOutput.model_validate_json(response_text[start_idx : end_idx + 1]) + return parsed.model_dump() + except ValidationError as e: + print(f"Validation error: {e}") + raise + + +@optional_task(log_prints=True, retries=3) +def save_validation_result( + supabase_client: SupabaseClient, + snippet_id, + parsed_response, + grounding_metadata, + thought_summaries, + model_name, + input_snippet_data, + input_user_feedback, + dislike_count, +): + decision = parsed_response["validation_decision"] + error_pattern = parsed_response["error_pattern"] + + supabase_client.insert_feedback_validation_result( + snippet_id=snippet_id, + validation_status=decision["status"], + validation_confidence=decision["confidence"], + original_claim_summary=parsed_response["original_claim_summary"], + user_feedback_summary=parsed_response["user_feedback_summary"], + input_snippet_data=input_snippet_data, + input_user_feedback=input_user_feedback, + validated_by=model_name, + grounding_metadata=grounding_metadata, + thought_summaries=thought_summaries or parsed_response["thought_summaries"], + dislike_count_at_validation=dislike_count, + error_pattern=error_pattern["error_type"], + error_pattern_explanation=error_pattern["explanation"], + prompt_improvement_suggestion=parsed_response["prompt_improvement_suggestion"], + ) + + print(f"Saved validation result: {decision['status']} (confidence: {decision['confidence']})") + + +@optional_task(log_prints=True) +def process_snippet( + supabase_client: SupabaseClient, + gemini_client: GeminiClient, + model_name: GeminiModel, + snippet, +): + snippet_id = snippet["id"] + print(f"Processing snippet: {snippet_id}") + + try: + user_prompt = build_validation_prompt(snippet, snippet["comments"]) + gemini_response = validate_with_gemini(gemini_client, model_name, user_prompt) + parsed_response = parse_validation_response(gemini_response["text"]) + + # Prepare input data for audit + input_snippet_data = { + k: v + for k, v in snippet.items() + if k not in ["grounding_metadata", "thought_summaries", "labels", "comments"] + } + + # Save result + save_validation_result( + supabase_client=supabase_client, + snippet_id=snippet_id, + parsed_response=parsed_response, + grounding_metadata=gemini_response["grounding_metadata"], + thought_summaries=gemini_response["thought_summaries"], + model_name=model_name, + input_snippet_data=input_snippet_data, + input_user_feedback={ + "labels": snippet["labels"], + "comments": snippet["comments"], + }, + dislike_count=snippet["dislike_count"], + ) + + print( + f"Validation complete for snippet {snippet_id}: {parsed_response['validation_decision']['status']}\n\n" + f"Error pattern: {parsed_response['error_pattern']['error_type']}" + ) + return True + + except Exception as e: + print(f"Error processing snippet {snippet_id}: {e}") + return False + + +@optional_flow( + name="Snippet Feedback Validation", + log_prints=True, + timeout_seconds=3600, +) +def snippet_feedback_validation(lookback_days: int, limit: int | None): + gemini_key = os.getenv("GOOGLE_GEMINI_KEY") + if not gemini_key: + raise ValueError("GOOGLE_GEMINI_KEY environment variable is not set") + + gemini_client = GeminiClient(api_key=gemini_key) + supabase_client = SupabaseClient( + supabase_url=os.getenv("SUPABASE_URL"), + supabase_key=os.getenv("SUPABASE_KEY"), + ) + + if lookback_days > 0: + since_date = datetime.now(timezone.utc) - timedelta(days=lookback_days) + else: + since_date = None + + snippets = fetch_snippets_with_dislikes(supabase_client, since_date, limit) + + if not snippets: + return + + success_count = 0 + error_count = 0 + + for snippet in snippets: + result = process_snippet( + supabase_client=supabase_client, + gemini_client=gemini_client, + model_name=GeminiModel.GEMINI_2_5_PRO, + snippet=snippet, + ) + + if result: + success_count += 1 + else: + error_count += 1 + + time.sleep(2) + + print(f"Feedback validation complete: {success_count} successful, {error_count} errors") diff --git a/src/processing_pipeline/supabase_utils.py b/src/processing_pipeline/supabase_utils.py index 24b539b..008b47a 100644 --- a/src/processing_pipeline/supabase_utils.py +++ b/src/processing_pipeline/supabase_utils.py @@ -434,3 +434,58 @@ def upsert_snippet_embedding(self, snippet_id, snippet_document, document_token_ def delete_vector_embedding_of_snippet(self, snippet_id): response = self.client.table("snippet_embeddings").delete().eq("snippet", snippet_id).execute() return response.data + + def get_snippets_with_recent_dislikes( + self, + since_date: datetime | None = None, + exclude_validated: bool = True, + limit: int | None = None, + ): + response = self.client.rpc( + "get_snippets_with_recent_dislikes", + { + "p_since_date": since_date.isoformat() if since_date else None, + "p_exclude_validated": exclude_validated, + "p_limit": limit, + } + ).execute() + return response.data or [] + + def insert_feedback_validation_result( + self, + snippet_id, + validation_status, + validation_confidence, + original_claim_summary, + user_feedback_summary, + input_snippet_data, + input_user_feedback, + validated_by, + grounding_metadata, + thought_summaries, + dislike_count_at_validation, + error_pattern, + error_pattern_explanation, + prompt_improvement_suggestion, + ): + response = ( + self.client.table("snippet_feedback_validation_results") + .insert({ + "snippet": snippet_id, + "validation_status": validation_status, + "validation_confidence": validation_confidence, + "original_claim_summary": original_claim_summary, + "user_feedback_summary": user_feedback_summary, + "input_snippet_data": input_snippet_data, + "input_user_feedback": input_user_feedback, + "validated_by": validated_by, + "grounding_metadata": grounding_metadata, + "thought_summaries": thought_summaries, + "dislike_count_at_validation": dislike_count_at_validation, + "error_pattern": error_pattern, + "error_pattern_explanation": error_pattern_explanation, + "prompt_improvement_suggestion": prompt_improvement_suggestion, + }) + .execute() + ) + return response.data[0] if response.data else None diff --git a/src/services/gemini_client.py b/src/services/gemini_client.py new file mode 100644 index 0000000..26e292d --- /dev/null +++ b/src/services/gemini_client.py @@ -0,0 +1,96 @@ +from google import genai +from google.genai.types import ( + GenerateContentConfig, + HarmBlockThreshold, + HarmCategory, + SafetySetting, + ThinkingConfig, + Tool, +) +from pydantic import BaseModel + +from processing_pipeline.constants import GeminiModel + + +class GeminiClient: + def __init__(self, api_key: str): + self.client = genai.Client(api_key=api_key) + + def generate_content( + self, + *, + model: GeminiModel, + user_prompt: str, + system_instruction: str, + max_output_tokens: int = 8192, + thinking_budget: int = 1024, + response_schema: dict | BaseModel | None = None, + tools: list[Tool] | None = None, + error_prefix: str | None = None, + ) -> dict: + config = GenerateContentConfig( + system_instruction=system_instruction, + max_output_tokens=max_output_tokens, + tools=tools, + thinking_config=ThinkingConfig(thinking_budget=thinking_budget, include_thoughts=True), + safety_settings=self._get_safety_settings(), + ) + + if response_schema: + config.response_mime_type = "application/json" + config.response_schema = response_schema + + response = self.client.models.generate_content( + model=model, + contents=[user_prompt], + config=config, + ) + + if not response.candidates: + raise ValueError(f"{error_prefix}: No candidates returned from Gemini.") + + text = response.text + if not text: + finish_reason = response.candidates[0].finish_reason + print(f"Response finish reason: {finish_reason}") + raise ValueError(f"{error_prefix}: No response from Gemini. Finish reason: {finish_reason}.") + + thought_summaries = "" + for part in response.candidates[0].content.parts: + if part.thought and part.text: + thought_summaries += part.text + + grounding_metadata = None + if response.candidates[0].grounding_metadata: + grounding_metadata = response.candidates[0].grounding_metadata.model_dump_json(indent=2) + + return { + "text": text, + "parsed": response.parsed, + "grounding_metadata": grounding_metadata, + "thought_summaries": thought_summaries, + } + + def _get_safety_settings(self): + return [ + SafetySetting( + category=HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, + threshold=HarmBlockThreshold.BLOCK_NONE, + ), + SafetySetting( + category=HarmCategory.HARM_CATEGORY_HATE_SPEECH, + threshold=HarmBlockThreshold.BLOCK_NONE, + ), + SafetySetting( + category=HarmCategory.HARM_CATEGORY_HARASSMENT, + threshold=HarmBlockThreshold.BLOCK_NONE, + ), + SafetySetting( + category=HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, + threshold=HarmBlockThreshold.BLOCK_NONE, + ), + SafetySetting( + category=HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY, + threshold=HarmBlockThreshold.BLOCK_NONE, + ), + ] diff --git a/supabase/database/sql/get_snippets_with_recent_dislikes.sql b/supabase/database/sql/get_snippets_with_recent_dislikes.sql new file mode 100644 index 0000000..3e344c1 --- /dev/null +++ b/supabase/database/sql/get_snippets_with_recent_dislikes.sql @@ -0,0 +1,101 @@ +DROP FUNCTION IF EXISTS get_snippets_with_recent_dislikes; + +CREATE OR REPLACE FUNCTION get_snippets_with_recent_dislikes( + p_since_date TIMESTAMPTZ DEFAULT NULL, + p_exclude_validated BOOLEAN DEFAULT TRUE, + p_limit INTEGER DEFAULT NULL +) RETURNS JSONB LANGUAGE plpgsql AS $$ +DECLARE + result JSONB; +BEGIN + WITH dislike_counts AS ( + SELECT + uls.snippet, + COUNT(*) AS dislike_count + FROM user_like_snippets uls + WHERE uls.value = -1 + AND (p_since_date IS NULL OR uls.created_at >= p_since_date) + GROUP BY uls.snippet + ), + filtered_snippets AS ( + SELECT dc.snippet, dc.dislike_count + FROM dislike_counts dc + WHERE NOT p_exclude_validated + OR NOT EXISTS ( + SELECT 1 + FROM snippet_feedback_validation_results sfvr + WHERE sfvr.snippet = dc.snippet + ) + ), + limited_snippets AS ( + SELECT + s.id, + s.recorded_at, + s.transcription, + s.translation, + s.title, + s.summary, + s.explanation, + s.disinformation_categories, + s.confidence_scores, + s.audio_file, + fs.dislike_count + FROM filtered_snippets fs + JOIN snippets s ON s.id = fs.snippet + ORDER BY fs.dislike_count DESC, s.recorded_at DESC + LIMIT p_limit + ), + snippet_labels_agg AS ( + SELECT + sl.snippet, + jsonb_agg( + jsonb_build_object( + 'text', l.text, + 'created_at', sl.created_at, + 'upvote_count', sl.upvote_count + ) + ) AS labels + FROM snippet_labels sl + JOIN labels l ON l.id = sl.label + WHERE sl.snippet IN (SELECT id FROM limited_snippets) + AND sl.applied_by IS NOT NULL + GROUP BY sl.snippet + ), + comments_agg AS ( + SELECT + c.room_id AS snippet, + jsonb_agg( + jsonb_build_object( + 'body', c.body, + 'comment_at', c.comment_at + ) + ORDER BY c.comment_at ASC + ) AS comments + FROM comments c + WHERE c.room_id IN (SELECT id FROM limited_snippets) + GROUP BY c.room_id + ) + SELECT COALESCE( + jsonb_agg( + to_jsonb(ls) || jsonb_build_object( + 'audio_file', jsonb_build_object( + 'radio_station_name', af.radio_station_name, + 'radio_station_code', af.radio_station_code, + 'location_state', af.location_state + ), + 'labels', COALESCE(sla.labels, '[]'::jsonb), + 'comments', COALESCE(ca.comments, '[]'::jsonb) + ) + ORDER BY ls.dislike_count DESC, ls.recorded_at DESC + ), + '[]'::jsonb + ) + INTO result + FROM limited_snippets ls + LEFT JOIN audio_files af ON af.id = ls.audio_file + LEFT JOIN snippet_labels_agg sla ON sla.snippet = ls.id + LEFT JOIN comments_agg ca ON ca.snippet = ls.id; + + RETURN result; +END; +$$;