From af7d9752426cd65adf00bd81237799eb4b0cd035 Mon Sep 17 00:00:00 2001 From: The Commissioner Date: Mon, 19 Jan 2026 22:16:39 -0600 Subject: [PATCH] adding user-space workflows + db tables --- CLAUDE.md | 266 +++ src/kurt/cli/workflows.py | 457 +++++ src/kurt/db/sqlite.py | 8 + src/kurt/integrations/research/cli.py | 114 +- src/kurt/integrations/research/config.py | 31 +- .../research/monitoring/__init__.py | 10 +- .../integrations/research/monitoring/apify.py | 684 ++++++++ .../research/tests/test_adapters.py | 442 +++++ src/kurt/workflows/__init__.py | 53 +- src/kurt/workflows/signals/config.py | 14 +- src/kurt/workflows/signals/steps.py | 42 + technically-newsroom-vision.md | 1533 +++++++++++++++++ 12 files changed, 3637 insertions(+), 17 deletions(-) create mode 100644 src/kurt/integrations/research/monitoring/apify.py create mode 100644 technically-newsroom-vision.md diff --git a/CLAUDE.md b/CLAUDE.md index 81abf5fd..fd71358d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -998,3 +998,269 @@ Context about the task. | Quick task | 5-10 | 50,000 | 120 | | Standard | 15-25 | 150,000 | 600 | | Complex | 30-50 | 300,000 | 1800 | + +--- + +## YAML Workflows (Multi-Step Pipelines) + +YAML workflows enable deterministic, multi-step pipelines with Python, LLM, and agentic steps. Unlike agent workflows (which are open-ended Claude Code sessions), YAML workflows have explicit steps with defined inputs, outputs, and data flow. + +### Workflow Definition Format + +```yaml +# workflows/my-pipeline.yaml +name: my-pipeline +title: My Processing Pipeline +description: | + Description of what this workflow does. + +# Input parameters with validation +inputs: + source_url: + type: string + required: true + description: URL to process + max_items: + type: integer + default: 100 + min: 1 + max: 1000 + dry_run: + type: boolean + default: false + +# Optional scheduling +schedule: + cron: "0 9 * * 1-5" # Weekdays at 9am + enabled: true + +# User-space database tables (auto-created) +tables: + processed_items: + columns: + - name: id + type: integer + primary_key: true + - name: source_url + type: string + - name: title + type: string + - name: summary + type: string + nullable: true + - name: keywords + type: json + default: [] + indexes: + - columns: [source_url] + +tags: [processing, example] + +# Steps (executed in order) +steps: + - name: fetch + type: python + code: | + # Python code with access to inputs, steps, tables + result = {"items": [...]} + + - name: enrich + type: llm + model: claude-sonnet-4-20250514 + prompt: | + Analyze: {item.content} + output_schema: + summary: string + keywords: list[string] + input: steps.fetch.items # Fan-out over items + concurrency: 3 + + - name: review + type: agentic + condition: inputs.use_agent_review # Conditional execution + model: claude-sonnet-4-20250514 + max_turns: 10 + allowed_tools: [Read, Write] + prompt: "Review the enriched items..." +``` + +### Step Types + +#### Python Step +Executes inline Python code with access to workflow context: + +```yaml +- name: process + type: python + code: | + # Available context: + # - inputs: workflow input parameters + # - steps: results from previous steps + # - tables: generated SQLModel classes + # - datetime, time, json modules + + result = { + "processed": transform(inputs["data"]), + "count": len(steps["fetch"]["items"]), + } +``` + +#### LLM Step +Direct LLM API call with structured output: + +```yaml +- name: analyze + type: llm + model: claude-sonnet-4-20250514 + prompt: | + Analyze: {item.content} + Return JSON with summary and keywords. + output_schema: + summary: string + keywords: list[string] + input: steps.fetch.items # Fan-out (optional) + concurrency: 5 # Parallel calls +``` + +#### Agentic Step +Runs Claude Code subprocess for complex tasks: + +```yaml +- name: review + type: agentic + condition: inputs.use_review # Conditional + model: claude-sonnet-4-20250514 + max_turns: 10 + allowed_tools: [Bash, Read, Write] + max_tokens: 100000 + max_time: 300 + prompt: | + Review items in steps["enrich"] and improve quality. + Save report to review_output.md. +``` + +### Input Types + +| Type | Python | Validation | +|------|--------|------------| +| `string` | `str` | - | +| `integer` | `int` | `min`, `max` | +| `float` | `float` | `min`, `max` | +| `boolean` | `bool` | - | + +### Column Types (Tables) + +| Type | SQLAlchemy | Notes | +|------|------------|-------| +| `string` | `String` | - | +| `integer` | `Integer` | - | +| `float` | `Float` | - | +| `boolean` | `Boolean` | - | +| `json` | `JSON` | For lists/dicts | +| `datetime` | `DateTime` | - | + +### Data Flow + +Steps can reference previous step outputs using dot notation: + +```yaml +# Reference entire result +input: steps.fetch + +# Reference nested value +input: steps.fetch.items + +# In prompt templates +prompt: "Process {steps.fetch.count} items from {inputs.source_url}" +``` + +### Conditional Execution + +Use `condition` to skip steps based on inputs or previous results: + +```yaml +- name: expensive_step + type: llm + condition: inputs.enable_expensive # Python expression + # ... + +- name: error_handler + type: python + condition: steps.main.status == "error" + # ... +``` + +### CLI Commands + +```bash +# List all workflows (agent + YAML) +kurt workflows defs list +kurt workflows defs list --type yaml +kurt workflows defs list --tag processing + +# Show workflow details +kurt workflows defs show my-pipeline + +# Validate workflow files +kurt workflows defs validate +kurt workflows defs validate workflows/my.yaml + +# Run a workflow +kurt workflows defs run my-pipeline --input source_url=https://example.com +kurt workflows defs run my-pipeline --foreground + +# Initialize with examples +kurt workflows defs init --type yaml +kurt workflows defs init --type both +``` + +### Module Structure + +``` +src/kurt/workflows/ +├── yaml_parser.py # YAML parsing (Pydantic models) +├── yaml_executor.py # DBOS workflow execution +├── yaml_tables.py # Dynamic SQLModel generation +├── registry.py # Unified discovery (agent + YAML) +└── cli.py # CLI commands (optional) +``` + +### Running Programmatically + +```python +from kurt.workflows import run_yaml_definition + +# Background execution +result = run_yaml_definition("my-pipeline", inputs={"source_url": "..."}) +print(result["workflow_id"]) + +# Foreground execution +result = run_yaml_definition("my-pipeline", background=False) +print(result["status"], result["results"]) +``` + +### Table Auto-Generation + +Tables defined in YAML are automatically converted to SQLModel classes: + +1. Tables are prefixed with `user__` to avoid conflicts +2. `TimestampMixin` is automatically added (created_at, updated_at) +3. Tables are created during database initialization +4. Access via `tables.` in Python steps + +### Security Considerations + +- **Python code**: Runs in sandboxed DBOS step context +- **Agentic steps**: Inherit permission model from agent workflows +- **Table names**: Auto-prefixed with `user_` to avoid conflicts + +### YAML vs Agent Workflows + +| Aspect | YAML Workflows | Agent Workflows | +|--------|----------------|-----------------| +| Definition | `.yaml` files | `.md` files | +| Structure | Explicit steps | Open-ended prompt | +| Data flow | Deterministic | Agent-controlled | +| Best for | Pipelines, ETL | Creative tasks, exploration | +| Tables | Declarative | None | +| LLM calls | Structured output | Free-form | diff --git a/src/kurt/cli/workflows.py b/src/kurt/cli/workflows.py index 829de5b2..32828ed4 100644 --- a/src/kurt/cli/workflows.py +++ b/src/kurt/cli/workflows.py @@ -380,3 +380,460 @@ def workflow_stats(workflow_id: Optional[str], output_json: bool): console.print(table) console.print() + + +# ============================================================================ +# Workflow Definitions Subgroup +# ============================================================================ + + +@workflows_group.group(name="defs") +def defs_group(): + """ + Manage workflow definitions (agent and YAML). + + \\b + Workflow definitions can be: + - Agent workflows (.md) - Executed by Claude Code + - YAML workflows (.yaml) - Multi-step pipelines with Python/LLM/agentic steps + """ + pass + + +@defs_group.command(name="list") +@click.option("--tag", "-t", help="Filter by tag") +@click.option( + "--type", + "wf_type", + type=click.Choice(["agent", "yaml", "all"]), + default="all", + help="Filter by workflow type", +) +@click.option("--scheduled", is_flag=True, help="Only show scheduled workflows") +@track_command +def defs_list_cmd(tag: str, wf_type: str, scheduled: bool): + """List all workflow definitions.""" + from kurt.workflows.registry import list_all_workflows + + workflows = list_all_workflows() + + if not workflows: + console.print("[dim]No workflow definitions found.[/dim]") + console.print( + "[dim]Create .md or .yaml files in workflows/ or run 'kurt workflows defs init'.[/dim]" + ) + return + + # Apply filters + if wf_type != "all": + workflows = [w for w in workflows if w.workflow_type == wf_type] + if tag: + workflows = [w for w in workflows if tag in w.tags] + if scheduled: + workflows = [w for w in workflows if w.schedule_cron] + + if not workflows: + console.print("[dim]No matching workflow definitions found.[/dim]") + return + + table = Table(title="Workflow Definitions") + table.add_column("Name", style="cyan") + table.add_column("Title") + table.add_column("Type", style="magenta") + table.add_column("Schedule") + table.add_column("Tags") + table.add_column("Steps") + + for w in workflows: + schedule_str = "-" + if w.schedule_cron: + schedule_str = w.schedule_cron + if not w.schedule_enabled: + schedule_str = f"{schedule_str} (disabled)" + + tags = ", ".join(w.tags) if w.tags else "-" + steps = str(w.step_count) if w.step_count else "-" + + table.add_row(w.name, w.title, w.workflow_type, schedule_str, tags, steps) + + console.print(table) + + +@defs_group.command(name="show") +@click.argument("name") +@track_command +def defs_show_cmd(name: str): + """Show workflow definition details.""" + from kurt.workflows.registry import get_workflow, get_workflow_type + + wf_type = get_workflow_type(name) + workflow = get_workflow(name) + + if not workflow: + console.print(f"[red]Workflow not found: {name}[/red]") + raise click.Abort() + + console.print() + console.print(f"[bold cyan]{workflow.title}[/bold cyan]") + console.print(f"[dim]Name: {workflow.name}[/dim]") + console.print(f"[dim]Type: {wf_type}[/dim]") + + if workflow.description: + console.print("\n[bold]Description:[/bold]") + console.print(workflow.description) + + if wf_type == "agent": + console.print("\n[bold]Agent Config:[/bold]") + console.print(f" Model: {workflow.agent.model}") + console.print(f" Max Turns: {workflow.agent.max_turns}") + console.print(f" Permission Mode: {workflow.agent.permission_mode}") + console.print(f" Allowed Tools: {', '.join(workflow.agent.allowed_tools)}") + + console.print("\n[bold]Guardrails:[/bold]") + console.print(f" Max Tokens: {workflow.guardrails.max_tokens:,}") + console.print(f" Max Tool Calls: {workflow.guardrails.max_tool_calls}") + console.print(f" Max Time: {workflow.guardrails.max_time}s") + + elif wf_type == "yaml": + console.print("\n[bold]Steps:[/bold]") + for i, step in enumerate(workflow.steps, 1): + condition_str = f" [dim](if {step.condition})[/dim]" if step.condition else "" + console.print(f" {i}. {step.name} [magenta]({step.type})[/magenta]{condition_str}") + + if workflow.tables: + console.print("\n[bold]Tables:[/bold]") + for table_name, table_def in workflow.tables.items(): + cols = ", ".join(c.name for c in table_def.columns) + console.print(f" - {table_name}: {cols}") + + schedule = getattr(workflow, "schedule", None) + if schedule: + console.print("\n[bold]Schedule:[/bold]") + console.print(f" Cron: {schedule.cron}") + console.print(f" Timezone: {schedule.timezone}") + enabled_str = "[green]Yes[/green]" if schedule.enabled else "[red]No[/red]" + console.print(f" Enabled: {enabled_str}") + + if workflow.inputs: + console.print("\n[bold]Inputs:[/bold]") + for key, value in workflow.inputs.items(): + if wf_type == "yaml": + # YAML inputs are InputDef objects + desc = value.description or "" + default = f" (default: {value.default})" if value.default is not None else "" + required = " [red]*[/red]" if value.required else "" + console.print(f" {key}{required}: {value.type}{default} {desc}") + else: + # Agent inputs are simple key-value + console.print(f" {key}: {value}") + + if workflow.tags: + console.print(f"\n[bold]Tags:[/bold] {', '.join(workflow.tags)}") + + if workflow.source_path: + console.print(f"\n[dim]Source: {workflow.source_path}[/dim]") + + +@defs_group.command(name="validate") +@click.argument("file", type=click.Path(exists=True), required=False) +@track_command +def defs_validate_cmd(file: Optional[str]): + """ + Validate workflow file(s). + + If FILE is provided, validates that specific file. + Otherwise, validates all workflow files. + """ + from pathlib import Path + + from kurt.workflows.agents.parser import validate_workflow as validate_agent + from kurt.workflows.registry import validate_all_workflows + from kurt.workflows.yaml_parser import validate_yaml_workflow + + if file: + path = Path(file) + if path.suffix == ".md": + errors = validate_agent(path) + elif path.suffix in (".yaml", ".yml"): + errors = validate_yaml_workflow(path) + else: + console.print(f"[red]Unknown file type: {path.suffix}[/red]") + raise click.Abort() + + if errors: + console.print("[red]Validation failed:[/red]") + for err in errors: + console.print(f" - {err}") + raise click.Abort() + else: + console.print("[green]✓ Validation passed[/green]") + else: + result = validate_all_workflows() + + if result["valid"]: + console.print(f"[green]Valid: {len(result['valid'])} workflow(s)[/green]") + for item in result["valid"]: + console.print(f" [green]✓[/green] {item['name']} ({item['type']})") + + if result["errors"]: + console.print("\n[red]Errors:[/red]") + for err in result["errors"]: + console.print(f" {err['file']} ({err['type']}):") + for e in err["errors"]: + console.print(f" - {e}") + + if not result["valid"] and not result["errors"]: + console.print("[dim]No workflow files found[/dim]") + + +@defs_group.command(name="run") +@click.argument("name") +@click.option("--input", "-i", "inputs", multiple=True, help="Input in key=value format") +@click.option("--foreground", "-f", is_flag=True, help="Run in foreground (blocking)") +@track_command +def defs_run_cmd(name: str, inputs: tuple, foreground: bool): + """ + Run a workflow definition. + + \\b + Examples: + kurt workflows defs run my-workflow + kurt workflows defs run my-workflow --input source_url=https://example.com + kurt workflows defs run my-workflow --foreground + """ + from kurt.workflows.registry import get_workflow_type + + # Parse inputs + input_dict = {} + for inp in inputs: + key, _, value = inp.partition("=") + if not key: + continue + try: + input_dict[key] = json.loads(value) + except json.JSONDecodeError: + input_dict[key] = value + + console.print(f"[dim]Running workflow: {name}[/dim]") + + wf_type = get_workflow_type(name) + if not wf_type: + console.print(f"[red]Workflow not found: {name}[/red]") + raise click.Abort() + + try: + if wf_type == "agent": + from kurt.workflows.agents.executor import run_definition + + result = run_definition( + name, + inputs=input_dict if input_dict else None, + background=not foreground, + trigger="manual", + ) + else: # yaml + from kurt.workflows.yaml_executor import run_yaml_definition + + result = run_yaml_definition( + name, + inputs=input_dict if input_dict else None, + background=not foreground, + trigger="manual", + ) + + if result.get("workflow_id") and not foreground: + console.print("[green]✓ Workflow started[/green]") + console.print(f" Workflow ID: {result['workflow_id']}") + console.print() + console.print( + f"[dim]Monitor with: [cyan]kurt workflows follow {result['workflow_id']}[/cyan][/dim]" + ) + else: + console.print("[green]✓ Workflow completed[/green]") + console.print(f" Status: {result.get('status')}") + + if wf_type == "agent": + console.print(f" Turns: {result.get('turns')}") + console.print(f" Tool Calls: {result.get('tool_calls')}") + console.print( + f" Tokens: {result.get('tokens_in', 0) + result.get('tokens_out', 0):,}" + ) + console.print(f" Duration: {result.get('duration_seconds')}s") + else: + if "results" in result: + console.print(f" Steps completed: {len(result['results'])}") + + except ValueError as e: + console.print(f"[red]Error:[/red] {e}") + raise click.Abort() + except Exception as e: + console.print(f"[red]Error running workflow:[/red] {e}") + raise click.Abort() + + +@defs_group.command(name="init") +@click.option( + "--type", + "wf_type", + type=click.Choice(["agent", "yaml", "both"]), + default="both", + help="Type of example to create", +) +@track_command +def defs_init_cmd(wf_type: str): + """ + Initialize the workflows directory with example workflows. + """ + from kurt.workflows.registry import ensure_workflows_dir + + workflows_dir = ensure_workflows_dir() + + created = [] + + # Create agent example + if wf_type in ("agent", "both"): + agent_path = workflows_dir / "example-agent.md" + if not agent_path.exists(): + agent_content = """--- +name: example-agent +title: Example Agent Workflow +description: | + An example workflow demonstrating the agent workflow format. + Customize this to create your own automated tasks. + +agent: + model: claude-sonnet-4-20250514 + max_turns: 10 + allowed_tools: + - Bash + - Read + - Write + - Glob + +guardrails: + max_tokens: 100000 + max_tool_calls: 50 + max_time: 300 + +inputs: + task: "List files in the current directory" + +tags: [example, agent] +--- + +# Example Agent Workflow + +You are running inside an automated workflow. Complete the following task: + +**Task:** {{task}} + +## Instructions + +1. Understand the task requirements +2. Use available tools to complete the task +3. Report your findings + +## Output + +Provide a summary of what you accomplished. +""" + agent_path.write_text(agent_content) + created.append(str(agent_path)) + + # Create YAML example + if wf_type in ("yaml", "both"): + yaml_path = workflows_dir / "example-pipeline.yaml" + if not yaml_path.exists(): + yaml_content = """name: example-pipeline +title: Example YAML Pipeline +description: | + An example YAML workflow demonstrating multi-step pipelines. + This workflow prepares data, analyzes it with an LLM, and compiles results. + +inputs: + topic: + type: string + required: true + description: Topic to research + max_results: + type: integer + default: 5 + min: 1 + max: 20 + +tables: + analysis_results: + columns: + - name: id + type: integer + primary_key: true + - name: topic + type: string + - name: summary + type: string + nullable: true + - name: keywords + type: json + default: [] + indexes: + - columns: [topic] + +steps: + - name: prepare + type: python + code: | + # Prepare the research context + result = { + "topic": inputs["topic"], + "max_results": inputs["max_results"], + "prepared_at": str(datetime.now()), + } + + - name: analyze + type: llm + model: claude-sonnet-4-20250514 + prompt: | + Analyze the following topic and provide a brief summary with keywords: + + Topic: {inputs.topic} + + Return your response as JSON with these fields: + - summary: A 2-3 sentence summary + - keywords: A list of 3-5 relevant keywords + output_schema: + summary: string + keywords: list[string] + + - name: report + type: python + code: | + # Compile the final report + result = { + "topic": inputs["topic"], + "summary": steps["analyze"].get("summary", "No summary available"), + "keywords": steps["analyze"].get("keywords", []), + "metadata": { + "prepared_at": steps["prepare"]["prepared_at"], + "completed_at": str(datetime.now()), + } + } + +tags: [example, yaml] +""" + yaml_path.write_text(yaml_content) + created.append(str(yaml_path)) + + if created: + console.print("[green]✓ Created example workflow(s):[/green]") + for path in created: + console.print(f" - {path}") + console.print() + console.print("[dim]Run with:[/dim]") + if wf_type in ("agent", "both"): + console.print("[cyan] kurt workflows defs run example-agent[/cyan]") + if wf_type in ("yaml", "both"): + console.print( + "[cyan] kurt workflows defs run example-pipeline --input topic='AI safety'[/cyan]" + ) + else: + console.print("[dim]Example workflows already exist.[/dim]") diff --git a/src/kurt/db/sqlite.py b/src/kurt/db/sqlite.py index ec911a51..a258b29e 100644 --- a/src/kurt/db/sqlite.py +++ b/src/kurt/db/sqlite.py @@ -88,6 +88,14 @@ def init_database(self) -> None: register_all_models() + # Discover YAML workflows and generate their table models + try: + from kurt.workflows.registry import discover_yaml_workflows + + discover_yaml_workflows() + except Exception as e: + logger.warning(f"Failed to discover YAML workflows: {e}") + self._ensure_directory() db_path = self.get_database_path() diff --git a/src/kurt/integrations/research/cli.py b/src/kurt/integrations/research/cli.py index 463de394..dfbe618a 100644 --- a/src/kurt/integrations/research/cli.py +++ b/src/kurt/integrations/research/cli.py @@ -27,7 +27,7 @@ def research_group(): @click.option( "--source", default="perplexity", - type=click.Choice(["perplexity"]), + type=click.Choice(["perplexity", "apify"]), help="Research source to configure", ) @track_command @@ -75,6 +75,10 @@ def onboard_cmd(source: str): console.print("\n[bold]Perplexity Setup:[/bold]") console.print(" API_KEY: Get from https://www.perplexity.ai/settings/api") console.print(" DEFAULT_MODEL: sonar-reasoning (recommended)") + elif source == "apify": + console.print("\n[bold]Apify Setup:[/bold]") + console.print(" API_TOKEN: Get from https://console.apify.com/account/integrations") + console.print(" DEFAULT_ACTOR: apidojo/tweet-scraper (Twitter/X search)") return # Test connection @@ -91,6 +95,20 @@ def onboard_cmd(source: str): else: console.print("[red]\u2717 Connection failed[/red]") raise click.Abort() + elif source == "apify": + from .monitoring.apify import ApifyAdapter + + adapter = ApifyAdapter(source_config) + if adapter.test_connection(): + console.print(f"[green]\u2713 Connected to {source.capitalize()}[/green]") + # Show account info + user_info = adapter.get_user_info() + if user_info: + username = user_info.get("username", "unknown") + console.print(f"[dim] Account: {username}[/dim]") + else: + console.print("[red]\u2717 Connection failed - check your API token[/red]") + raise click.Abort() else: console.print(f"[yellow]No connection test available for {source}[/yellow]") @@ -128,7 +146,7 @@ def status_cmd(): return # Check each known source - known_sources = ["perplexity"] + known_sources = ["perplexity", "apify"] for source in known_sources: if source_configured(source): @@ -143,3 +161,95 @@ def status_cmd(): console.print(f" [dim]-[/dim] {source.capitalize()} - [dim]Not configured[/dim]") console.print() + + +@research_group.command("apify") +@click.option("--query", "-q", required=True, help="Search term or hashtag") +@click.option( + "--platform", + "-p", + type=click.Choice(["twitter", "linkedin", "threads"]), + default="twitter", + help="Social platform to search", +) +@click.option("--max-items", default=20, help="Maximum items to fetch") +@click.option("--output", "-o", type=click.Path(), help="Output file (JSON)") +@track_command +def apify_cmd(query: str, platform: str, max_items: int, output: str | None): + """ + Search social media via Apify actors. + + \b + Examples: + kurt integrations research apify -q "AI agents" + kurt integrations research apify -q "#devtools" --platform twitter + kurt integrations research apify -q "machine learning" -p linkedin --max-items 50 + kurt integrations research apify -q "startup" -o results.json + """ + import json + + from .config import get_source_config, source_configured + from .monitoring.apify import ApifyAdapter + + # Check configuration + if not source_configured("apify"): + console.print("[yellow]Apify not configured.[/yellow]") + console.print("Run: [cyan]kurt integrations research onboard --source apify[/cyan]") + raise click.Abort() + + try: + config = get_source_config("apify") + adapter = ApifyAdapter(config) + + console.print(f"[dim]Searching {platform} for: {query}...[/dim]") + + # Use platform-specific method + if platform == "twitter": + signals = adapter.search_twitter(query, max_items=max_items) + elif platform == "linkedin": + signals = adapter.search_linkedin(query, max_items=max_items) + elif platform == "threads": + signals = adapter.search_threads(query, max_items=max_items) + else: + signals = adapter.fetch_signals(query, max_items=max_items) + + if not signals: + console.print("[yellow]No results found.[/yellow]") + return + + # Convert to output format + results = [ + { + "title": s.title[:80], + "url": s.url, + "score": s.score, + "comments": s.comment_count, + "relevance": round(s.relevance_score, 3), + "author": s.author, + "timestamp": s.timestamp.isoformat() if s.timestamp else None, + } + for s in signals + ] + + if output: + with open(output, "w") as f: + json.dump(results, f, indent=2) + console.print(f"[green]✓ Saved {len(results)} results to {output}[/green]") + else: + console.print(f"\n[bold]Found {len(signals)} results:[/bold]\n") + for s in signals[:10]: + score_str = f"[{s.relevance_score:.2f}]" + title_str = s.title[:60] + "..." if len(s.title) > 60 else s.title + console.print(f" {score_str} {title_str}") + if s.url: + console.print(f" [dim]{s.url[:70]}[/dim]") + + if len(signals) > 10: + console.print(f"\n [dim]... and {len(signals) - 10} more[/dim]") + + except ValueError as e: + console.print(f"[red]Configuration error:[/red] {e}") + raise click.Abort() + except Exception as e: + console.print(f"[red]Error:[/red] {e}") + raise click.Abort() diff --git a/src/kurt/integrations/research/config.py b/src/kurt/integrations/research/config.py index 343bea03..73ed061b 100644 --- a/src/kurt/integrations/research/config.py +++ b/src/kurt/integrations/research/config.py @@ -62,7 +62,7 @@ def get_source_config(source: str) -> dict[str, Any]: Get configuration for a specific research source. Args: - source: Research source name (e.g., 'perplexity') + source: Research source name (e.g., 'perplexity', 'apify') Returns: Source-specific configuration dictionary @@ -74,23 +74,27 @@ def get_source_config(source: str) -> dict[str, Any]: if source not in config: available = ", ".join(config.keys()) if config else "none configured" + # Determine the correct key name for this source + key_name = "API_TOKEN" if source == "apify" else "API_KEY" raise ValueError( f"No configuration found for research source '{source}'.\n" f"Available sources: {available}\n" f"\n" f"To configure {source}, add to kurt.config:\n" - f" RESEARCH_{source.upper()}_API_KEY=your_api_key_here" + f" RESEARCH_{source.upper()}_{key_name}=your_key_here" ) - # Check for placeholder API key + # Check for placeholder API key/token source_config = config[source] - api_key = source_config.get("api_key", "") - if "YOUR_" in api_key or "PLACEHOLDER" in api_key: + # Check both api_key and api_token (different sources use different names) + api_credential = source_config.get("api_key") or source_config.get("api_token", "") + if "YOUR_" in api_credential or "PLACEHOLDER" in api_credential: + key_name = "API_TOKEN" if source == "apify" else "API_KEY" raise ValueError( - f"API key not configured for '{source}'.\n" + f"API credentials not configured for '{source}'.\n" f"\n" f"Edit kurt.config and update:\n" - f" RESEARCH_{source.upper()}_API_KEY=your_actual_api_key" + f" RESEARCH_{source.upper()}_{key_name}=your_actual_key" ) return source_config @@ -127,6 +131,11 @@ def create_template_config(source: str) -> dict[str, Any]: "max_tokens": "4000", "temperature": "0.2", } + elif source == "apify": + return { + "api_token": "YOUR_APIFY_API_TOKEN", + "default_actor": "apidojo/tweet-scraper", + } else: return { "api_key": "YOUR_API_KEY", @@ -146,7 +155,7 @@ def source_configured(source: str) -> bool: source: Research source name Returns: - True if source is configured with valid API key + True if source is configured with valid API key/token """ try: config = load_research_config() @@ -159,9 +168,9 @@ def source_configured(source: str) -> bool: if has_placeholder_values(source_config): return False - # Check for empty API key - api_key = source_config.get("api_key", "") - if not api_key: + # Check for empty API key/token (different sources use different names) + api_credential = source_config.get("api_key") or source_config.get("api_token", "") + if not api_credential: return False return True diff --git a/src/kurt/integrations/research/monitoring/__init__.py b/src/kurt/integrations/research/monitoring/__init__.py index 8e0a76fe..255b0285 100644 --- a/src/kurt/integrations/research/monitoring/__init__.py +++ b/src/kurt/integrations/research/monitoring/__init__.py @@ -1,9 +1,14 @@ """ Monitoring adapters for research signals. -Provides adapters for Reddit, HackerNews, and RSS/Atom feeds. +Provides adapters for Reddit, HackerNews, RSS/Atom feeds, and Apify (social media). """ +from kurt.integrations.research.monitoring.apify import ( + ActorConfig, + ApifyAdapter, + FieldMapping, +) from kurt.integrations.research.monitoring.feeds import FeedAdapter from kurt.integrations.research.monitoring.hackernews import HackerNewsAdapter from kurt.integrations.research.monitoring.models import Signal @@ -14,4 +19,7 @@ "RedditAdapter", "HackerNewsAdapter", "FeedAdapter", + "ApifyAdapter", + "ActorConfig", + "FieldMapping", ] diff --git a/src/kurt/integrations/research/monitoring/apify.py b/src/kurt/integrations/research/monitoring/apify.py new file mode 100644 index 00000000..d73e97bc --- /dev/null +++ b/src/kurt/integrations/research/monitoring/apify.py @@ -0,0 +1,684 @@ +""" +Apify adapter for social media monitoring. + +Uses Apify's API to run actors for Twitter/X, LinkedIn, and other platforms. +Requires an API token configured in kurt.config as RESEARCH_APIFY_API_TOKEN. + +Supports flexible actor configuration: +- Use built-in actor presets for common platforms +- Pass raw actor input for full control +- Configure field mappings for custom actors +""" + +from __future__ import annotations + +import hashlib +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Callable + +import httpx + +from kurt.integrations.research.monitoring.models import Signal + + +@dataclass +class FieldMapping: + """ + Maps actor output fields to Signal fields. + + Each field can be: + - A string: direct field name lookup + - A list of strings: try each in order, use first non-null + - A callable: function(item) -> value for complex extraction + """ + + text: str | list[str] | Callable[[dict], str] = field( + default_factory=lambda: ["text", "content", "title", "postContent", "description"] + ) + url: str | list[str] | Callable[[dict], str] = field( + default_factory=lambda: ["url", "postUrl", "link", "profileUrl"] + ) + id: str | list[str] | Callable[[dict], str] = field( + default_factory=lambda: ["id", "postId", "objectID", "tweetId"] + ) + score: str | list[str] | Callable[[dict], int] = field( + default_factory=lambda: ["likeCount", "likes", "numLikes", "reactions", "favoriteCount"] + ) + comments: str | list[str] | Callable[[dict], int] = field( + default_factory=lambda: ["replyCount", "replies", "commentCount", "numComments"] + ) + author: str | list[str] | Callable[[dict], str] = field( + default_factory=lambda: ["author", "username", "authorName", "user"] + ) + timestamp: str | list[str] | Callable[[dict], str] = field( + default_factory=lambda: ["createdAt", "created_at", "publishedAt", "postedAt", "date"] + ) + + +@dataclass +class ActorConfig: + """ + Configuration for a specific Apify actor. + + Defines how to build input and parse output for an actor. + """ + + actor_id: str + source_name: str # e.g., "twitter", "linkedin" + + # Input configuration + # Function to build actor input: (query, max_items, **kwargs) -> dict + build_input: Callable[[str, int, dict], dict] | None = None + + # Output configuration + field_mapping: FieldMapping = field(default_factory=FieldMapping) + + # Description for CLI help + description: str = "" + + +def _build_twitter_search_input(query: str, max_items: int, kwargs: dict) -> dict: + """Build input for Twitter search actors.""" + return { + "searchTerms": [query], + "maxItems": max_items, + "sort": kwargs.get("sort", "Latest"), + **{k: v for k, v in kwargs.items() if k != "sort"}, + } + + +def _build_twitter_profile_input(query: str, max_items: int, kwargs: dict) -> dict: + """Build input for Twitter profile scraper actors.""" + return { + "handles": [query] if not isinstance(query, list) else query, + "maxItems": max_items, + "tweetsDesired": kwargs.get("tweets_desired", max_items), + **{k: v for k, v in kwargs.items() if k not in ["tweets_desired"]}, + } + + +def _build_linkedin_search_input(query: str, max_items: int, kwargs: dict) -> dict: + """Build input for LinkedIn search actors.""" + # Check if query is already a URL + if query.startswith("http"): + search_url = query + else: + search_url = f"https://www.linkedin.com/search/results/content/?keywords={query}" + return { + "searchUrl": search_url, + "maxItems": max_items, + **kwargs, + } + + +def _build_linkedin_profile_input(query: str, max_items: int, kwargs: dict) -> dict: + """Build input for LinkedIn profile scraper actors.""" + # Query can be profile URL or list of URLs + urls = [query] if isinstance(query, str) else query + return { + "profileUrls": urls, + **kwargs, + } + + +def _build_generic_search_input(query: str, max_items: int, kwargs: dict) -> dict: + """Build generic search input - works for many actors.""" + return { + "searchTerms": [query], + "maxItems": max_items, + **kwargs, + } + + +# Registry of known actors with their configurations +ACTOR_REGISTRY: dict[str, ActorConfig] = { + # Twitter/X actors + "apidojo/tweet-scraper": ActorConfig( + actor_id="apidojo/tweet-scraper", + source_name="twitter", + build_input=_build_twitter_search_input, + description="Search Twitter/X for tweets matching a query", + ), + "quacker/twitter-scraper": ActorConfig( + actor_id="quacker/twitter-scraper", + source_name="twitter", + build_input=_build_twitter_search_input, + description="Alternative Twitter search scraper", + ), + "apidojo/twitter-user-scraper": ActorConfig( + actor_id="apidojo/twitter-user-scraper", + source_name="twitter", + build_input=_build_twitter_profile_input, + description="Scrape tweets from specific Twitter profiles", + ), + # LinkedIn actors + "curious_coder/linkedin-post-search-scraper": ActorConfig( + actor_id="curious_coder/linkedin-post-search-scraper", + source_name="linkedin", + build_input=_build_linkedin_search_input, + description="Search LinkedIn for posts matching a query", + ), + "anchor/linkedin-profile-scraper": ActorConfig( + actor_id="anchor/linkedin-profile-scraper", + source_name="linkedin", + build_input=_build_linkedin_profile_input, + description="Scrape LinkedIn profile data", + ), + # Threads actors + "apidojo/threads-scraper": ActorConfig( + actor_id="apidojo/threads-scraper", + source_name="threads", + build_input=_build_generic_search_input, + description="Search Threads for posts", + ), +} + +# Platform aliases map to default actors +PLATFORM_DEFAULTS = { + "twitter": "apidojo/tweet-scraper", + "linkedin": "curious_coder/linkedin-post-search-scraper", + "threads": "apidojo/threads-scraper", +} + + +class ApifyAdapter: + """ + Adapter for fetching social signals via Apify actors. + + Supports three levels of usage: + + 1. High-level convenience methods: + adapter.search_twitter("AI agents") + adapter.search_linkedin("B2B marketing") + + 2. Mid-level with actor selection: + adapter.fetch_signals("query", actor="apidojo/tweet-scraper") + adapter.fetch_signals("@username", actor="apidojo/twitter-user-scraper") + + 3. Low-level raw execution: + result = adapter.run_actor("any/actor", {"custom": "input"}) + signals = adapter.parse_results(result, field_mapping=custom_mapping) + """ + + BASE_URL = "https://api.apify.com/v2" + + def __init__( + self, + config: dict[str, Any], + actor_registry: dict[str, ActorConfig] | None = None, + ): + """ + Initialize Apify adapter. + + Args: + config: Configuration dict with api_token and optional settings + actor_registry: Custom actor registry (extends built-in registry) + """ + self.api_token = config["api_token"] + self.default_actor = config.get("default_actor", PLATFORM_DEFAULTS["twitter"]) + + # Merge custom registry with built-in + self.actor_registry = {**ACTOR_REGISTRY} + if actor_registry: + self.actor_registry.update(actor_registry) + + # Custom field mappings from config + self._custom_mappings: dict[str, FieldMapping] = {} + + def register_actor(self, actor_config: ActorConfig) -> None: + """ + Register a custom actor configuration. + + Args: + actor_config: Actor configuration to register + """ + self.actor_registry[actor_config.actor_id] = actor_config + + def set_field_mapping(self, actor_id: str, mapping: FieldMapping) -> None: + """ + Set custom field mapping for an actor. + + Args: + actor_id: Actor ID to configure + mapping: Field mapping configuration + """ + self._custom_mappings[actor_id] = mapping + + def test_connection(self) -> bool: + """Test API token validity.""" + try: + response = httpx.get( + f"{self.BASE_URL}/users/me", + headers={"Authorization": f"Bearer {self.api_token}"}, + timeout=10.0, + ) + return response.status_code == 200 + except httpx.RequestError: + return False + + def get_user_info(self) -> dict[str, Any] | None: + """Get user account information.""" + try: + response = httpx.get( + f"{self.BASE_URL}/users/me", + headers={"Authorization": f"Bearer {self.api_token}"}, + timeout=10.0, + ) + if response.status_code == 200: + return response.json() + return None + except httpx.RequestError: + return None + + def list_actors(self) -> list[dict[str, str]]: + """ + List all registered actors with descriptions. + + Returns: + List of dicts with actor_id, source_name, description + """ + return [ + { + "actor_id": cfg.actor_id, + "source_name": cfg.source_name, + "description": cfg.description, + } + for cfg in self.actor_registry.values() + ] + + # ========================================================================= + # Low-level API: Raw actor execution + # ========================================================================= + + def run_actor( + self, + actor: str, + actor_input: dict[str, Any], + timeout: float = 120.0, + ) -> list[dict[str, Any]]: + """ + Run an Apify actor with raw input and return raw output. + + This is the lowest-level method - use for full control over + actor execution when the convenience methods don't fit. + + Args: + actor: Actor ID (e.g., "apidojo/tweet-scraper") + actor_input: Raw input dict passed directly to the actor + timeout: Request timeout in seconds + + Returns: + Raw list of result items from the actor + + Raises: + Exception: If actor run fails + """ + try: + response = httpx.post( + f"{self.BASE_URL}/acts/{actor}/run-sync-get-dataset-items", + headers={ + "Authorization": f"Bearer {self.api_token}", + "Content-Type": "application/json", + }, + json=actor_input, + timeout=timeout, + ) + response.raise_for_status() + return response.json() + except httpx.TimeoutException: + raise Exception(f"Apify actor {actor} timed out after {timeout} seconds") + except httpx.HTTPStatusError as e: + raise Exception(f"Apify API error: {e.response.status_code} - {e.response.text}") + except httpx.RequestError as e: + raise Exception(f"Failed to connect to Apify: {e}") + + def parse_results( + self, + items: list[dict[str, Any]], + source: str = "apify", + query: str = "", + field_mapping: FieldMapping | None = None, + ) -> list[Signal]: + """ + Parse raw actor results into Signal objects. + + Use with run_actor() for custom parsing of actor output. + + Args: + items: Raw result items from run_actor() + source: Source name for signals (e.g., "twitter") + query: Original query (stored in keywords) + field_mapping: Custom field mapping (uses defaults if None) + + Returns: + List of Signal objects + """ + mapping = field_mapping or FieldMapping() + signals = [] + + for item in items: + try: + signal = self._parse_item_with_mapping(item, source, query, mapping) + if signal: + signals.append(signal) + except Exception: + continue + + return signals + + # ========================================================================= + # Mid-level API: Actor-aware execution + # ========================================================================= + + def fetch_signals( + self, + query: str, + actor: str | None = None, + max_items: int = 50, + keywords: list[str] | None = None, + actor_input: dict[str, Any] | None = None, + field_mapping: FieldMapping | None = None, + **kwargs: Any, + ) -> list[Signal]: + """ + Fetch social signals via Apify actor. + + Args: + query: Search term, hashtag, or profile username + actor: Apify actor ID (uses default if None) + max_items: Maximum items to return + keywords: Optional keyword filter (applied after fetch) + actor_input: Raw actor input (bypasses input building if provided) + field_mapping: Custom field mapping for output parsing + **kwargs: Additional actor-specific parameters + + Returns: + List of Signal objects sorted by relevance + """ + actor = actor or self.default_actor + + # Get actor config if registered + actor_config = self.actor_registry.get(actor) + + # Build or use provided input + if actor_input is not None: + # Raw input provided - use directly + final_input = actor_input + elif actor_config and actor_config.build_input: + # Use registered input builder + final_input = actor_config.build_input(query, max_items, kwargs) + else: + # Fall back to generic input + final_input = _build_generic_search_input(query, max_items, kwargs) + + # Run actor + items = self.run_actor(actor, final_input) + + # Determine source name + source = actor_config.source_name if actor_config else self._guess_source(actor) + + # Get field mapping (priority: param > custom > actor config > default) + mapping = ( + field_mapping + or self._custom_mappings.get(actor) + or (actor_config.field_mapping if actor_config else None) + or FieldMapping() + ) + + # Parse results + signals = self.parse_results(items, source, query, mapping) + + # Filter by keywords if provided + if keywords: + signals = [s for s in signals if s.matches_keywords(keywords)] + + # Sort by relevance + signals.sort(key=lambda s: s.relevance_score, reverse=True) + + return signals[:max_items] + + # ========================================================================= + # High-level API: Platform convenience methods + # ========================================================================= + + def search_twitter( + self, + query: str, + max_items: int = 50, + keywords: list[str] | None = None, + actor: str | None = None, + **kwargs: Any, + ) -> list[Signal]: + """ + Search Twitter/X for posts matching query. + + Args: + query: Search term or hashtag + max_items: Maximum items to return + keywords: Optional keyword filter + actor: Specific Twitter actor (uses default if None) + **kwargs: Additional actor parameters + """ + actor = actor or PLATFORM_DEFAULTS["twitter"] + return self.fetch_signals( + query=query, + actor=actor, + max_items=max_items, + keywords=keywords, + **kwargs, + ) + + def search_linkedin( + self, + query: str, + max_items: int = 50, + keywords: list[str] | None = None, + actor: str | None = None, + **kwargs: Any, + ) -> list[Signal]: + """ + Search LinkedIn for posts matching query. + + Args: + query: Search term or LinkedIn search URL + max_items: Maximum items to return + keywords: Optional keyword filter + actor: Specific LinkedIn actor (uses default if None) + **kwargs: Additional actor parameters + """ + actor = actor or PLATFORM_DEFAULTS["linkedin"] + return self.fetch_signals( + query=query, + actor=actor, + max_items=max_items, + keywords=keywords, + **kwargs, + ) + + def search_threads( + self, + query: str, + max_items: int = 50, + keywords: list[str] | None = None, + actor: str | None = None, + **kwargs: Any, + ) -> list[Signal]: + """ + Search Threads for posts matching query. + + Args: + query: Search term or hashtag + max_items: Maximum items to return + keywords: Optional keyword filter + actor: Specific Threads actor (uses default if None) + **kwargs: Additional actor parameters + """ + actor = actor or PLATFORM_DEFAULTS["threads"] + return self.fetch_signals( + query=query, + actor=actor, + max_items=max_items, + keywords=keywords, + **kwargs, + ) + + def scrape_profile( + self, + profile: str, + platform: str = "twitter", + max_items: int = 50, + **kwargs: Any, + ) -> list[Signal]: + """ + Scrape posts from a specific profile. + + Args: + profile: Username or profile URL + platform: Platform name (twitter, linkedin) + max_items: Maximum items to return + **kwargs: Additional actor parameters + """ + # Map platform to profile scraper actor + profile_actors = { + "twitter": "apidojo/twitter-user-scraper", + "linkedin": "anchor/linkedin-profile-scraper", + } + + actor = profile_actors.get(platform) + if not actor: + raise ValueError(f"No profile scraper available for platform: {platform}") + + return self.fetch_signals( + query=profile, + actor=actor, + max_items=max_items, + **kwargs, + ) + + # ========================================================================= + # Internal helpers + # ========================================================================= + + def _guess_source(self, actor: str) -> str: + """Guess source name from actor ID.""" + actor_lower = actor.lower() + if "twitter" in actor_lower or "tweet" in actor_lower: + return "twitter" + elif "linkedin" in actor_lower: + return "linkedin" + elif "threads" in actor_lower: + return "threads" + return "apify" + + def _extract_field( + self, + item: dict[str, Any], + field_spec: str | list[str] | Callable[[dict], Any], + ) -> Any: + """Extract a field value using the field specification.""" + if callable(field_spec): + return field_spec(item) + elif isinstance(field_spec, list): + for field_name in field_spec: + value = self._get_nested(item, field_name) + if value is not None: + return value + return None + else: + return self._get_nested(item, field_spec) + + def _get_nested(self, item: dict[str, Any], field_name: str) -> Any: + """Get a possibly nested field value (supports dot notation).""" + if "." in field_name: + parts = field_name.split(".") + value = item + for part in parts: + if isinstance(value, dict): + value = value.get(part) + else: + return None + return value + return item.get(field_name) + + def _parse_item_with_mapping( + self, + item: dict[str, Any], + source: str, + query: str, + mapping: FieldMapping, + ) -> Signal | None: + """Parse a single item using field mapping.""" + text = self._extract_field(item, mapping.text) or "" + url = self._extract_field(item, mapping.url) or "" + + if not text and not url: + return None + + # Extract ID + item_id = self._extract_field(item, mapping.id) + if not item_id: + item_id = hashlib.md5(f"{url}{text[:100]}".encode()).hexdigest()[:12] + signal_id = f"{source}_{item_id}" + + # Extract metrics + score = self._extract_field(item, mapping.score) or 0 + comment_count = self._extract_field(item, mapping.comments) or 0 + + # Extract author + author_value = self._extract_field(item, mapping.author) + if isinstance(author_value, dict): + author = author_value.get("username") or author_value.get("name") + else: + author = author_value + + # Parse timestamp + timestamp = self._parse_date(self._extract_field(item, mapping.timestamp)) + + # Title is first line or truncated text + title = text.split("\n")[0][:200] if text else url[:200] + + return Signal( + signal_id=signal_id, + source=source, + title=title, + url=url, + snippet=text[:500] if text else None, + timestamp=timestamp, + author=author, + score=int(score) if score else 0, + comment_count=int(comment_count) if comment_count else 0, + keywords=[query] if query else [], + ) + + def _parse_date(self, date_str: str | None) -> datetime: + """Parse ISO date string with fallback to now.""" + if not date_str: + return datetime.now() + try: + if isinstance(date_str, str): + clean = date_str.replace("Z", "+00:00") + return datetime.fromisoformat(clean) + return datetime.now() + except (ValueError, TypeError): + return datetime.now() + + # Legacy compatibility aliases + def _run_actor_sync(self, actor: str, input_data: dict) -> list[dict]: + """Legacy alias for run_actor.""" + return self.run_actor(actor, input_data) + + def _actor_to_source(self, actor: str) -> str: + """Legacy alias for _guess_source.""" + return self._guess_source(actor) + + def _build_actor_input(self, actor: str, query: str, max_items: int, **kwargs) -> dict: + """Legacy method - builds input using registry or generic fallback.""" + actor_config = self.actor_registry.get(actor) + if actor_config and actor_config.build_input: + return actor_config.build_input(query, max_items, kwargs) + return _build_generic_search_input(query, max_items, kwargs) + + def _parse_signals(self, items: list[dict], source: str, query: str) -> list[Signal]: + """Legacy alias for parse_results.""" + return self.parse_results(items, source, query) diff --git a/src/kurt/integrations/research/tests/test_adapters.py b/src/kurt/integrations/research/tests/test_adapters.py index efff5195..fb0b02ec 100644 --- a/src/kurt/integrations/research/tests/test_adapters.py +++ b/src/kurt/integrations/research/tests/test_adapters.py @@ -3,7 +3,10 @@ from datetime import datetime from unittest.mock import MagicMock, patch +import pytest + from kurt.integrations.research.base import Citation, ResearchResult +from kurt.integrations.research.monitoring.apify import ApifyAdapter from kurt.integrations.research.monitoring.feeds import FeedAdapter from kurt.integrations.research.monitoring.hackernews import HackerNewsAdapter from kurt.integrations.research.monitoring.models import Signal @@ -254,3 +257,442 @@ def test_check_feed_valid(self, mock_parse): assert result["valid"] is True assert result["title"] == "Test Feed" assert result["entry_count"] == 1 + + +class TestApifyAdapter: + """Tests for Apify adapter.""" + + def test_init(self): + """Test adapter initialization.""" + config = {"api_token": "test_token"} + adapter = ApifyAdapter(config) + + assert adapter.api_token == "test_token" + assert adapter.default_actor == "apidojo/tweet-scraper" + + def test_init_with_custom_actor(self): + """Test adapter initialization with custom default actor.""" + config = { + "api_token": "test_token", + "default_actor": "custom/actor", + } + adapter = ApifyAdapter(config) + + assert adapter.default_actor == "custom/actor" + + @patch("httpx.get") + def test_test_connection_success(self, mock_get): + """Test successful connection test.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + + adapter = ApifyAdapter({"api_token": "test_token"}) + result = adapter.test_connection() + + assert result is True + mock_get.assert_called_once() + + @patch("httpx.get") + def test_test_connection_failure(self, mock_get): + """Test failed connection test.""" + mock_response = MagicMock() + mock_response.status_code = 401 + mock_get.return_value = mock_response + + adapter = ApifyAdapter({"api_token": "bad_token"}) + result = adapter.test_connection() + + assert result is False + + @patch("httpx.post") + def test_fetch_signals_twitter(self, mock_post): + """Test fetching Twitter signals.""" + mock_response = MagicMock() + mock_response.json.return_value = [ + { + "id": "123456", + "text": "This is a test tweet about AI", + "url": "https://twitter.com/user/status/123456", + "createdAt": "2024-01-15T12:00:00Z", + "author": {"username": "testuser"}, + "likeCount": 100, + "replyCount": 25, + }, + { + "id": "789012", + "text": "Another tweet about machine learning", + "url": "https://twitter.com/user/status/789012", + "createdAt": "2024-01-15T11:00:00Z", + "author": {"username": "anotheruser"}, + "likeCount": 50, + "replyCount": 10, + }, + ] + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + adapter = ApifyAdapter({"api_token": "test_token"}) + signals = adapter.fetch_signals("AI", max_items=10) + + assert len(signals) == 2 + assert signals[0].signal_id == "twitter_123456" + assert signals[0].source == "twitter" + assert "test tweet" in signals[0].title + assert signals[0].score == 100 + assert signals[0].comment_count == 25 + assert signals[0].author == "testuser" + + @patch("httpx.post") + def test_fetch_signals_linkedin(self, mock_post): + """Test fetching LinkedIn signals.""" + mock_response = MagicMock() + mock_response.json.return_value = [ + { + "postId": "urn:li:share:abc123", + "postContent": "Great insights on B2B marketing", + "postUrl": "https://linkedin.com/posts/abc123", + "reactions": 200, + "numComments": 45, + "authorName": "Marketing Pro", + } + ] + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + adapter = ApifyAdapter({"api_token": "test_token"}) + signals = adapter.search_linkedin("B2B marketing", max_items=10) + + assert len(signals) == 1 + assert signals[0].source == "linkedin" + assert "B2B marketing" in signals[0].title + assert signals[0].score == 200 + assert signals[0].comment_count == 45 + + @patch("httpx.post") + def test_fetch_signals_with_keyword_filter(self, mock_post): + """Test keyword filtering on fetched signals.""" + mock_response = MagicMock() + mock_response.json.return_value = [ + { + "id": "1", + "text": "Tweet about Python programming", + "url": "https://twitter.com/1", + "likeCount": 100, + }, + { + "id": "2", + "text": "Tweet about JavaScript", + "url": "https://twitter.com/2", + "likeCount": 50, + }, + ] + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + adapter = ApifyAdapter({"api_token": "test_token"}) + signals = adapter.fetch_signals("programming", keywords=["Python"]) + + assert len(signals) == 1 + assert "Python" in signals[0].title + + @patch("httpx.post") + def test_fetch_signals_sorted_by_relevance(self, mock_post): + """Test that signals are sorted by relevance score.""" + mock_response = MagicMock() + mock_response.json.return_value = [ + { + "id": "low", + "text": "Low engagement post", + "url": "https://twitter.com/low", + "likeCount": 5, + "replyCount": 1, + }, + { + "id": "high", + "text": "High engagement post", + "url": "https://twitter.com/high", + "likeCount": 500, + "replyCount": 100, + }, + ] + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + adapter = ApifyAdapter({"api_token": "test_token"}) + signals = adapter.fetch_signals("test") + + # High engagement should be first + assert signals[0].signal_id == "twitter_high" + assert signals[0].relevance_score > signals[1].relevance_score + + def test_actor_to_source_mapping(self): + """Test actor ID to source name mapping.""" + adapter = ApifyAdapter({"api_token": "test"}) + + assert adapter._actor_to_source("apidojo/tweet-scraper") == "twitter" + assert adapter._actor_to_source("some/twitter-actor") == "twitter" + assert adapter._actor_to_source("curious_coder/linkedin-post-search-scraper") == "linkedin" + assert adapter._actor_to_source("apidojo/threads-scraper") == "threads" + assert adapter._actor_to_source("some/unknown-actor") == "apify" + + def test_parse_date_valid(self): + """Test parsing valid ISO date.""" + adapter = ApifyAdapter({"api_token": "test"}) + + result = adapter._parse_date("2024-01-15T12:00:00Z") + assert result.year == 2024 + assert result.month == 1 + assert result.day == 15 + + def test_parse_date_invalid(self): + """Test parsing invalid date returns now.""" + adapter = ApifyAdapter({"api_token": "test"}) + + result = adapter._parse_date("invalid-date") + assert result is not None + # Should be close to now + assert (datetime.now() - result).seconds < 10 + + def test_parse_date_none(self): + """Test parsing None date returns now.""" + adapter = ApifyAdapter({"api_token": "test"}) + + result = adapter._parse_date(None) + assert result is not None + + @patch("httpx.post") + def test_run_actor_raw_input(self, mock_post): + """Test run_actor with raw input dict.""" + mock_response = MagicMock() + mock_response.json.return_value = [{"data": "raw"}] + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + adapter = ApifyAdapter({"api_token": "test_token"}) + result = adapter.run_actor( + "custom/actor", {"customField": "customValue", "anotherField": 123} + ) + + assert result == [{"data": "raw"}] + # Verify the raw input was passed through + call_args = mock_post.call_args + assert call_args[1]["json"] == {"customField": "customValue", "anotherField": 123} + + @patch("httpx.post") + def test_fetch_signals_with_raw_actor_input(self, mock_post): + """Test fetch_signals with actor_input parameter bypasses input building.""" + mock_response = MagicMock() + mock_response.json.return_value = [ + {"id": "1", "text": "Test", "url": "https://example.com", "likeCount": 10} + ] + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + adapter = ApifyAdapter({"api_token": "test_token"}) + signals = adapter.fetch_signals( + query="ignored", # Should be ignored when actor_input provided + actor="custom/actor", + actor_input={"myCustomInput": "value", "limit": 5}, + ) + + # Verify raw input was used + call_args = mock_post.call_args + assert call_args[1]["json"] == {"myCustomInput": "value", "limit": 5} + assert len(signals) == 1 + + def test_custom_field_mapping(self): + """Test parsing with custom field mapping.""" + from kurt.integrations.research.monitoring.apify import FieldMapping + + adapter = ApifyAdapter({"api_token": "test"}) + + # Custom mapping for an actor with different field names + custom_mapping = FieldMapping( + text="body", + url="permalink", + id="postId", + score="upvotes", + comments="replies", + author="poster", + timestamp="created", + ) + + items = [ + { + "postId": "abc123", + "body": "Custom format post", + "permalink": "https://custom.com/abc123", + "upvotes": 50, + "replies": 10, + "poster": "customuser", + "created": "2024-01-15T12:00:00Z", + } + ] + + signals = adapter.parse_results(items, "custom", "query", custom_mapping) + + assert len(signals) == 1 + assert signals[0].signal_id == "custom_abc123" + assert signals[0].title == "Custom format post" + assert signals[0].score == 50 + assert signals[0].comment_count == 10 + assert signals[0].author == "customuser" + + def test_callable_field_mapping(self): + """Test field mapping with callable extractors.""" + from kurt.integrations.research.monitoring.apify import FieldMapping + + adapter = ApifyAdapter({"api_token": "test"}) + + # Custom mapping with callable for complex extraction + custom_mapping = FieldMapping( + text=lambda item: f"{item.get('headline', '')} - {item.get('summary', '')}", + url="link", + id="uuid", + score=lambda item: item.get("stats", {}).get("likes", 0), + comments=lambda item: item.get("stats", {}).get("comments", 0), + author=lambda item: item.get("creator", {}).get("handle"), + timestamp="publishedAt", + ) + + items = [ + { + "uuid": "xyz789", + "headline": "Breaking News", + "summary": "Important update", + "link": "https://news.com/xyz789", + "stats": {"likes": 100, "comments": 25}, + "creator": {"handle": "newsbot", "name": "News Bot"}, + "publishedAt": "2024-01-15T12:00:00Z", + } + ] + + signals = adapter.parse_results(items, "news", "query", custom_mapping) + + assert len(signals) == 1 + assert "Breaking News - Important update" in signals[0].title + assert signals[0].score == 100 + assert signals[0].comment_count == 25 + assert signals[0].author == "newsbot" + + def test_register_custom_actor(self): + """Test registering a custom actor configuration.""" + from kurt.integrations.research.monitoring.apify import ActorConfig, FieldMapping + + adapter = ApifyAdapter({"api_token": "test"}) + + # Register custom actor + custom_config = ActorConfig( + actor_id="my/custom-scraper", + source_name="custom_platform", + build_input=lambda q, n, kw: {"query": q, "count": n}, + field_mapping=FieldMapping(text="message", url="href"), + description="My custom scraper", + ) + adapter.register_actor(custom_config) + + # Verify it's registered + assert "my/custom-scraper" in adapter.actor_registry + assert adapter.actor_registry["my/custom-scraper"].source_name == "custom_platform" + + def test_list_actors(self): + """Test listing registered actors.""" + adapter = ApifyAdapter({"api_token": "test"}) + + actors = adapter.list_actors() + + # Should have the built-in actors + actor_ids = [a["actor_id"] for a in actors] + assert "apidojo/tweet-scraper" in actor_ids + assert "curious_coder/linkedin-post-search-scraper" in actor_ids + + # Each should have required fields + for actor in actors: + assert "actor_id" in actor + assert "source_name" in actor + assert "description" in actor + + def test_set_field_mapping_for_actor(self): + """Test setting custom field mapping for specific actor.""" + from kurt.integrations.research.monitoring.apify import FieldMapping + + adapter = ApifyAdapter({"api_token": "test"}) + + custom_mapping = FieldMapping(text="custom_text_field") + adapter.set_field_mapping("some/actor", custom_mapping) + + assert "some/actor" in adapter._custom_mappings + assert adapter._custom_mappings["some/actor"].text == "custom_text_field" + + def test_nested_field_extraction(self): + """Test extracting nested fields with dot notation.""" + adapter = ApifyAdapter({"api_token": "test"}) + + item = {"user": {"profile": {"name": "Test User"}}} + + result = adapter._get_nested(item, "user.profile.name") + assert result == "Test User" + + # Non-existent path + result = adapter._get_nested(item, "user.nonexistent.field") + assert result is None + + @patch("httpx.post") + def test_scrape_profile_twitter(self, mock_post): + """Test scraping Twitter profile.""" + mock_response = MagicMock() + mock_response.json.return_value = [ + {"id": "1", "text": "Profile tweet", "url": "https://twitter.com/1", "likeCount": 50} + ] + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + adapter = ApifyAdapter({"api_token": "test_token"}) + signals = adapter.scrape_profile("@elonmusk", platform="twitter", max_items=10) + + assert len(signals) == 1 + # Verify the correct actor was used + call_args = mock_post.call_args + assert "twitter-user-scraper" in call_args[0][0] + + def test_scrape_profile_unsupported_platform(self): + """Test scraping profile for unsupported platform raises error.""" + adapter = ApifyAdapter({"api_token": "test"}) + + with pytest.raises(ValueError, match="No profile scraper available"): + adapter.scrape_profile("@user", platform="tiktok") + + @patch("httpx.post") + def test_convenience_methods_accept_actor_override(self, mock_post): + """Test that convenience methods accept actor parameter.""" + mock_response = MagicMock() + mock_response.json.return_value = [ + {"id": "1", "text": "Test", "url": "https://twitter.com/1", "likeCount": 10} + ] + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + adapter = ApifyAdapter({"api_token": "test_token"}) + + # Use alternative Twitter actor + adapter.search_twitter("query", actor="quacker/twitter-scraper") + + call_args = mock_post.call_args + assert "quacker/twitter-scraper" in call_args[0][0] + + def test_init_with_custom_registry(self): + """Test initializing adapter with custom actor registry.""" + from kurt.integrations.research.monitoring.apify import ActorConfig + + custom_registry = { + "my/actor": ActorConfig( + actor_id="my/actor", source_name="my_source", description="My custom actor" + ) + } + + adapter = ApifyAdapter({"api_token": "test"}, actor_registry=custom_registry) + + # Should have both built-in and custom actors + assert "apidojo/tweet-scraper" in adapter.actor_registry # Built-in + assert "my/actor" in adapter.actor_registry # Custom diff --git a/src/kurt/workflows/__init__.py b/src/kurt/workflows/__init__.py index 3166fb25..b49f79d9 100644 --- a/src/kurt/workflows/__init__.py +++ b/src/kurt/workflows/__init__.py @@ -1 +1,52 @@ -"""Example workflows using the kurt framework.""" +"""Workflows for kurt - agent (.md) and YAML (.yaml) based pipelines.""" + +from .registry import ( + discover_yaml_workflows, + ensure_workflows_dir, + get_agent_workflow, + get_workflow, + get_workflow_type, + get_workflows_dir, + get_yaml_workflow, + list_agent_workflows, + list_all_workflows, + list_yaml_workflows, + validate_all_workflows, +) +from .yaml_executor import execute_yaml_workflow, run_yaml_definition +from .yaml_parser import ParsedYamlWorkflow, parse_yaml_workflow, validate_yaml_workflow +from .yaml_tables import ( + clear_generated_models, + generate_sqlmodel_class, + generate_workflow_tables, + get_generated_model, + list_generated_models, +) + +__all__ = [ + # Registry + "get_workflows_dir", + "list_agent_workflows", + "get_agent_workflow", + "list_yaml_workflows", + "get_yaml_workflow", + "list_all_workflows", + "get_workflow", + "get_workflow_type", + "validate_all_workflows", + "ensure_workflows_dir", + "discover_yaml_workflows", + # YAML Parser + "ParsedYamlWorkflow", + "parse_yaml_workflow", + "validate_yaml_workflow", + # YAML Tables + "generate_sqlmodel_class", + "generate_workflow_tables", + "get_generated_model", + "list_generated_models", + "clear_generated_models", + # YAML Executor + "execute_yaml_workflow", + "run_yaml_definition", +] diff --git a/src/kurt/workflows/signals/config.py b/src/kurt/workflows/signals/config.py index d839ff33..ac6b5104 100644 --- a/src/kurt/workflows/signals/config.py +++ b/src/kurt/workflows/signals/config.py @@ -10,8 +10,8 @@ class SignalsConfig(StepConfig): """Configuration for signals monitoring workflow.""" - source: Literal["reddit", "hackernews", "feeds"] = ConfigParam( - description="Signal source: reddit, hackernews, or feeds" + source: Literal["reddit", "hackernews", "feeds", "apify"] = ConfigParam( + description="Signal source: reddit, hackernews, feeds, or apify" ) # Reddit options @@ -26,6 +26,16 @@ class SignalsConfig(StepConfig): description="RSS/Atom feed URL", ) + # Apify options + apify_query: str | None = ConfigParam( + default=None, + description="Search query for Apify (required if source=apify)", + ) + apify_platform: Literal["twitter", "linkedin", "threads"] = ConfigParam( + default="twitter", + description="Social platform to search via Apify", + ) + # Common filters keywords: str | None = ConfigParam( default=None, diff --git a/src/kurt/workflows/signals/steps.py b/src/kurt/workflows/signals/steps.py index b088d74d..48fe0379 100644 --- a/src/kurt/workflows/signals/steps.py +++ b/src/kurt/workflows/signals/steps.py @@ -10,6 +10,7 @@ from kurt.db import managed_session from kurt.integrations.research.monitoring import ( + ApifyAdapter, FeedAdapter, HackerNewsAdapter, RedditAdapter, @@ -94,6 +95,47 @@ def fetch_signals_step(config_dict: dict[str, Any]) -> dict[str, Any]: limit=config.limit, ) + elif config.source == "apify": + if not config.apify_query: + raise ValueError("apify_query is required for apify source") + + # Get Apify config from research config + from kurt.integrations.research.config import get_source_config, source_configured + + if not source_configured("apify"): + raise ValueError( + "Apify not configured. Run: kurt integrations research onboard --source apify" + ) + + apify_config = get_source_config("apify") + adapter = ApifyAdapter(apify_config) + + # Use platform-specific method + if config.apify_platform == "twitter": + signals = adapter.search_twitter( + query=config.apify_query, + max_items=config.limit, + keywords=keywords or None, + ) + elif config.apify_platform == "linkedin": + signals = adapter.search_linkedin( + query=config.apify_query, + max_items=config.limit, + keywords=keywords or None, + ) + elif config.apify_platform == "threads": + signals = adapter.search_threads( + query=config.apify_query, + max_items=config.limit, + keywords=keywords or None, + ) + else: + signals = adapter.fetch_signals( + query=config.apify_query, + max_items=config.limit, + keywords=keywords or None, + ) + else: raise ValueError(f"Unknown signal source: {config.source}") diff --git a/technically-newsroom-vision.md b/technically-newsroom-vision.md new file mode 100644 index 00000000..511b04cd --- /dev/null +++ b/technically-newsroom-vision.md @@ -0,0 +1,1533 @@ +# Technically Newsroom: Automated Content Pipeline +## Vision Document & Technical Specification + +--- + +## Executive Summary + +This document describes an automated newsroom system for Technically, built as an extension to [kurt-core](https://github.com/boringdata/kurt-core). The system handles the full content lifecycle: + +**Topic Discovery → Brief Creation → Writer Assignment → Visual/Video Production → Publishing → Distribution → Analytics** + +The architecture prioritizes: +- **Deterministic steps** where possible (API calls, transforms) to reduce token consumption +- **LLM batch steps** for classification/scoring (parallel, tracked, cost-efficient) +- **Agentic execution** only when complex reasoning/tool use is required +- **Human-in-the-loop** at key decision points (topic approval, editorial review, correspondent recording) + +Built on kurt-core's DBOS workflow foundation, with custom workflows, models, and integrations defined in user-space. + +--- + +## Table of Contents + +1. [System Overview](#system-overview) +2. [Macro Workflows](#macro-workflows) +3. [Micro Workflows (Step Details)](#micro-workflows) +4. [Database Models](#database-models) +5. [Integration Requirements](#integration-requirements) +6. [CLI Commands](#cli-commands) +7. [Technical Architecture](#technical-architecture) +8. [Implementation Phases](#implementation-phases) + +--- + +## System Overview + +### Why Kurt-Core? + +Kurt-core provides: +- **DBOS workflows** — Durable, observable, recoverable execution +- **LLMStep** — Batch LLM calls with concurrency, tracking, cost metrics +- **Agent execution** — Claude CLI subprocess with guardrails +- **TracingHooks** — Token/cost tracking to database +- **Existing integrations** — Reddit, HN, RSS signals as starting points + +We extend kurt with custom workflows, models, and integrations for newsroom operations. + +### Workflow Types + +| Type | When to Use | Token Cost | +|------|-------------|------------| +| `@DBOS.step()` | Deterministic Python (API calls, transforms) | Zero | +| `LLMStep` | Batch classification/scoring | ~$0.01-0.05 per item | +| `agent_execution_step` | Complex reasoning, tool use, research | ~$0.10-1.00 per run | + +### Human-in-the-Loop Gates + +| Gate | Decision Maker | Mechanism | +|------|----------------|-----------| +| Topic approval | Editor | Slack interactive buttons | +| Brief approval | Editor | Asana task status | +| Visual approval | Designer (optional) | Review queue | +| Publish approval | Editor | CMS workflow | +| Correspondent recording | Correspondent | Manual (calendar scheduled) | + +--- + +## Macro Workflows + +### Overview + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ MACRO WORKFLOW MAP │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ M1: TOPIC PIPELINE M2: BRIEF & ASSIGNMENT M3: PRODUCTION │ +│ ┌───────────────────┐ ┌───────────────────┐ ┌──────────────┐ │ +│ │ Sources → Scored │ │ Topic → Writer │ │ Brief → Assets│ │ +│ │ Topics → Approved │ │ Assignment │ │ (Visual+Video)│ │ +│ │ │ │ │ │ │ │ +│ │ Scheduled: 15 min │ │ Triggered: Manual │ │ Triggered: │ │ +│ │ + Daily digest │ │ or on approval │ │ On approval │ │ +│ └───────────────────┘ └───────────────────┘ └──────────────┘ │ +│ │ +│ M4: PUBLISHING M5: DISTRIBUTION M6: ANALYTICS │ +│ ┌───────────────────┐ ┌───────────────────┐ ┌──────────────┐ │ +│ │ Draft → CMS → │ │ Published → │ │ Performance │ │ +│ │ Scheduled Publish │ │ Social + Newsletter│ │ → Feedback │ │ +│ │ │ │ │ │ → Tuning │ │ +│ │ Triggered: On │ │ Triggered: On │ │ │ │ +│ │ editor approval │ │ publish │ │ Weekly cycle │ │ +│ └───────────────────┘ └───────────────────┘ └──────────────┘ │ +│ │ +│ M7: CORRESPONDENT RECRUITING │ +│ ┌───────────────────────────────────────────────────────────────────────────┐ │ +│ │ Topic/Beat → Find Experts on Social → Score/Rank → Outreach Pipeline │ │ +│ │ │ │ +│ │ Triggered: On demand or when new beat identified │ │ +│ └───────────────────────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +### M1: Topic Pipeline + +**Purpose:** Continuously discover, score, and surface story-worthy topics. + +**Schedule:** +- Ingestion: Every 15 minutes +- Digest: Daily at 9am + +**Pipeline:** + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ M1: TOPIC PIPELINE │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ FETCH (Parallel, Pure Python) │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ RSS │ │ Reddit │ │ HN │ │ Twitter │ │ GitHub │ │ +│ │ Feeds │ │Subreddits│ │ Front │ │ Lists │ │ Trending │ │ +│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ +│ │ │ │ │ │ │ +│ └────────────┴────────────┴────────────┴────────────┘ │ +│ │ │ +│ ▼ │ +│ DEDUPE (Pure Python) ┌─────────────┐ │ +│ │ Dedupe │ │ +│ │ vs. Known │ │ +│ └──────┬──────┘ │ +│ │ │ +│ ▼ │ +│ SCORE (LLMStep - Batch) ┌─────────────┐ │ +│ │ Score: │ │ +│ │ • Relevance │ ~$0.01/topic │ +│ │ • Timeliness│ │ +│ │ • Uniqueness│ │ +│ │ • Audience │ │ +│ └──────┬──────┘ │ +│ │ │ +│ ▼ │ +│ PERSIST (Transaction) ┌─────────────┐ │ +│ │ Save to │ │ +│ │ Database │ │ +│ └──────┬──────┘ │ +│ │ │ +│ ┌────────────┴────────────┐ │ +│ ▼ ▼ │ +│ NOTIFY ┌──────────┐ ┌──────────┐ │ +│ │ Daily │ │ Alert │ │ +│ │ Digest │ │ (Score │ │ +│ │ (9am) │ │ > 85) │ │ +│ └──────────┘ └──────────┘ │ +│ │ +│ Output: Slack with [Approve] [Reject] [Assign] buttons │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +**Scoring Dimensions (0-100 each):** + +| Dimension | Weight | Description | +|-----------|--------|-------------| +| Relevance | 30% | How relevant to Technically's beat? | +| Timeliness | 25% | Breaking news (high) vs. evergreen (lower) | +| Uniqueness | 20% | Already covered elsewhere? | +| Audience Fit | 15% | Would our startup/developer audience care? | +| Competition | 10% | Can we add unique value vs competitors? | + +--- + +### M2: Brief & Assignment + +**Purpose:** Generate research brief, create writing assignment, notify writer. + +**Trigger:** Editor approves topic (Slack button) or auto-approval (score > 90) + +**Pipeline:** + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ M2: BRIEF & ASSIGNMENT │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ INPUT ┌─────────────┐ │ +│ │ Approved │ │ +│ │ Topic │ │ +│ └──────┬──────┘ │ +│ │ │ +│ ▼ │ +│ RESEARCH (Agent Step) ┌─────────────────────────────────────────────┐ │ +│ │ Deep Research Agent │ │ +│ │ • Find primary sources │ │ +│ │ • Gather expert perspectives │ │ +│ │ • Collect data/statistics │ │ +│ │ • Identify interview contacts │ │ +│ │ │ │ +│ │ Tools: WebSearch, WebFetch, Read, Write │ │ +│ │ Output: research-{topic}.md │ │ +│ └──────────────┬──────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ COMPETITOR SCAN ┌─────────────────────────────────────────────┐ │ +│ (Pure Python) │ Search: TechCrunch, Verge, Ars, Wired │ │ +│ │ Extract: angles, coverage gaps │ │ +│ └──────────────┬──────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ GENERATE BRIEF ┌─────────────────────────────────────────────┐ │ +│ (LLMStep) │ Create structured brief: │ │ +│ │ • 3 headline options │ │ +│ │ • Story angle/hook │ │ +│ │ • Key facts (with sources) │ │ +│ │ • SEO keywords │ │ +│ │ • Visual suggestions │ │ +│ │ • Suggested word count │ │ +│ └──────────────┬──────────────────────────────┘ │ +│ │ │ +│ ┌────────────────────┼────────────────────┐ │ +│ ▼ ▼ ▼ │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ Create │ │ Create │ │ Notify │ │ +│ │ Google │ │ Asana │ │ Writer │ │ +│ │ Doc │ │ Task │ │ (Slack) │ │ +│ └──────────┘ └──────────┘ └──────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +**Brief Structure:** + +```markdown +# Brief: {Title} + +## Story Angle +{One paragraph on the unique angle} + +## Headline Options +1. {Option 1} +2. {Option 2} +3. {Option 3} + +## Key Facts +- {Fact 1} (Source: {url}) +- {Fact 2} (Source: {url}) +... + +## Sources +### Primary +- {url}: {why it matters} +### Secondary +- {url}: {context} + +## Competitor Coverage +| Publication | Angle | Gap for Us | +|-------------|-------|------------| +| ... | ... | ... | + +## Interview Contacts +- {Name}, {Title} at {Company} - {Twitter/LinkedIn} + +## SEO +- Target keywords: {kw1}, {kw2}, {kw3} +- Suggested word count: {n} + +## Visual Needs +- [ ] Featured image: {concept} +- [ ] Diagram: {concept} +- [ ] Video: {yes/no} +``` + +--- + +### M3: Production (Visual & Video) + +**Purpose:** Generate all visual assets and prepare video pre-production. + +#### M3a: Visual Asset Pipeline + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ M3a: VISUAL ASSET PIPELINE │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ FEATURED IMAGE │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ Generate │───▶│ DALL-E │───▶│ Apply │───▶│ Upload │ │ +│ │ Prompt │ │ Generate │ │ Brand │ │Cloudinary│ │ +│ │ (LLMStep)│ │ (API) │ │ Template │ │ │ │ +│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ +│ │ +│ TECHNICAL DIAGRAMS (Technically's Signature Style) │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌─────────┐ │ +│ │ Generate │───▶│ Render │───▶│ Nano │───▶│ Add │───▶│ Upload │ │ +│ │ Mermaid │ │ Mermaid │ │ Banana 3 │ │ Branding │ │Cloudinary│ │ +│ │ (LLMStep)│ │ (CLI) │ │ Style │ │ │ │ │ │ +│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ └─────────┘ │ +│ │ +│ DATA VISUALIZATIONS │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ Extract │───▶│ Generate │───▶│ Upload │ │ +│ │ Data │ │ Chart │ │Cloudinary│ │ +│ │ │ │(Flourish)│ │ │ │ +│ └──────────┘ └──────────┘ └──────────┘ │ +│ │ +│ SOCIAL CARDS │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ Generate │───▶│ Apply │───▶│ Upload │ │ +│ │ Copy │ │ Template │ │Cloudinary│ │ +│ │ (LLMStep)│ │ (Canva) │ │ │ │ +│ └──────────┘ └──────────┘ └──────────┘ │ +│ │ +│ OUTPUT: All variants in DAM │ +│ └── story-slug/ │ +│ ├── featured-1200x630.png (OG/social) │ +│ ├── hero-1920x1080.png │ +│ ├── thumb-400x300.png │ +│ ├── diagrams/ │ +│ └── social/ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +#### M3b: Video Pre-Production + +**Important:** Correspondents record their own video (home studio or iPhone). This workflow prepares everything they need. + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ M3b: VIDEO PRE-PRODUCTION │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ SCRIPT GENERATION │ +│ ┌──────────────────────────────────────────────────────────────────────┐ │ +│ │ Input: Article content │ │ +│ │ Output: 90-second video script │ │ +│ │ │ │ +│ │ Structure: │ │ +│ │ • Hook (5 sec) │ │ +│ │ • Context (15 sec) │ │ +│ │ • Key Point 1 (20 sec) │ │ +│ │ • Key Point 2 (20 sec) │ │ +│ │ • Key Point 3 (20 sec) │ │ +│ │ • CTA (10 sec) │ │ +│ │ │ │ +│ │ Tone: Conversational but authoritative │ │ +│ └──────────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ SHOT LIST GENERATION │ +│ ┌──────────────────────────────────────────────────────────────────────┐ │ +│ │ Break script into shots: │ │ +│ │ │ │ +│ │ | # | Type | Duration | Content | Visual | │ │ +│ │ |---|-----------|----------|-------------------|-----------------| │ │ +│ │ | 1 | Talking | 5s | Hook | Correspondent | │ │ +│ │ | 2 | B-roll | 10s | Context | Diagram 1 | │ │ +│ │ | 3 | Talking | 15s | Point 1 | Correspondent | │ │ +│ │ | 4 | B-roll | 5s | Point 1 visual | Diagram 2 | │ │ +│ │ | ... | │ │ +│ └──────────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ DIAGRAM GENERATION │ +│ ┌──────────────────────────────────────────────────────────────────────┐ │ +│ │ For each shot requiring diagram: │ │ +│ │ → Call M3a diagram pipeline │ │ +│ │ → Output: Styled diagrams ready for video │ │ +│ └──────────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ SCHEDULING │ +│ ┌──────────────────────────────────────────────────────────────────────┐ │ +│ │ • Check correspondent availability │ │ +│ │ • Create calendar event with: │ │ +│ │ - Script link │ │ +│ │ - Shot list link │ │ +│ │ - Diagram assets │ │ +│ │ - Recording instructions │ │ +│ │ • Send Slack notification │ │ +│ └──────────────────────────────────────────────────────────────────────┘ │ +│ │ +│ POST-PRODUCTION (After correspondent uploads footage) │ +│ ┌──────────────────────────────────────────────────────────────────────┐ │ +│ │ • Ingest footage to Descript │ │ +│ │ • Auto-transcription │ │ +│ │ • Editor assembles rough cut │ │ +│ │ • Export → YouTube → Embed in article │ │ +│ └──────────────────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +### M4: Publishing + +**Purpose:** Transform approved content into CMS draft, schedule publication. + +**Trigger:** Editor marks brief as "ready to publish" + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ M4: PUBLISHING │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ │ +│ │ Google Doc │ (Writer's finished article) │ +│ │ (Approved) │ │ +│ └──────┬───────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Transform │ • Fetch via Google Docs API │ +│ │ to Markdown │ • Clean formatting │ +│ │ │ • Extract metadata │ +│ └──────┬───────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Generate │ • Meta title (optimized for CTR) │ +│ │ SEO │ • Meta description │ +│ │ Metadata │ • Schema markup (Article, Author, etc.) │ +│ │ (LLMStep) │ │ +│ └──────┬───────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Attach │ • Featured image from DAM │ +│ │ Assets │ • Diagrams │ +│ │ │ • Video embed (if applicable) │ +│ └──────┬───────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Create CMS │ • Push to Sanity via API │ +│ │ Draft │ • Generate preview URL │ +│ │ │ • Set scheduled publish time │ +│ └──────┬───────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ Notify │ • Slack to editor with preview link │ +│ │ Editor │ • "Review and publish" prompt │ +│ └──────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +### M5: Distribution + +**Purpose:** Amplify published content across channels. + +**Trigger:** Article published (CMS webhook) + +**Timing:** +- T+0: Website, Twitter thread, LinkedIn +- T+2h: Community posts (Discord, Reddit if appropriate) +- T+24h: Newsletter digest inclusion +- T+24h: Twitter reminder + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ M5: DISTRIBUTION │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ IMMEDIATE (T+0) │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ │ │ +│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ +│ │ │ Generate │ │ Generate │ │ Post │ │ │ +│ │ │ Twitter │ │ LinkedIn │ │ Threads │ │ │ +│ │ │ Thread │ │ Post │ │ (Parallel) │ │ │ +│ │ │ (LLMStep) │ │ (LLMStep) │ │ │ │ │ +│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ +│ │ │ │ +│ │ Twitter Thread Format: │ │ +│ │ 1. Hook + link │ │ +│ │ 2. Key insight 1 │ │ +│ │ 3. Key insight 2 │ │ +│ │ 4. Key insight 3 │ │ +│ │ 5. CTA + link │ │ +│ │ │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ +│ SAME DAY (T+2h) │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ • Post to Discord community │ │ +│ │ • Post to relevant Slack communities (if applicable) │ │ +│ │ • Submit to HN/Reddit (manual review queue) │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ +│ NEXT DAY (T+24h) │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ • Add to newsletter digest queue │ │ +│ │ • Post Twitter reminder: "In case you missed it..." │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +### M6: Analytics & Feedback Loop + +**Purpose:** Track performance, identify patterns, tune scoring. + +**Schedule:** Weekly (Monday morning) + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ M6: ANALYTICS & FEEDBACK LOOP │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ DATA COLLECTION (Continuous) │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ +│ │ │ GA4 │ │ Search │ │ Twitter │ │LinkedIn │ │Newsletter│ │ │ +│ │ │ │ │ Console │ │ API │ │ API │ │ API │ │ │ +│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ +│ │ └───────────┴───────────┴───────────┴───────────┘ │ │ +│ │ │ │ │ +│ │ ▼ │ │ +│ │ ┌─────────────────┐ │ │ +│ │ │ Article Metrics │ │ │ +│ │ │ Table │ │ │ +│ │ └─────────────────┘ │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ +│ WEEKLY ANALYSIS │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ │ │ +│ │ Performance Report: │ │ +│ │ ┌───────────────────────────────────────────────────────────────────┐ │ │ +│ │ │ TOP PERFORMERS │ │ │ +│ │ │ • Which topics drove most traffic? │ │ │ +│ │ │ • Which formats had best engagement? │ │ │ +│ │ │ • Which distribution channels performed? │ │ │ +│ │ │ • Which correspondents' videos performed best? │ │ │ +│ │ │ │ │ │ +│ │ │ UNDERPERFORMERS │ │ │ +│ │ │ • Topics with high score but low actual traffic │ │ │ +│ │ │ • High bounce rate articles │ │ │ +│ │ │ • Low video completion rates │ │ │ +│ │ │ │ │ │ +│ │ │ SEO OPPORTUNITIES │ │ │ +│ │ │ • Keywords ranking 5-15 (optimize potential) │ │ │ +│ │ │ • New ranking keywords to double down on │ │ │ +│ │ └───────────────────────────────────────────────────────────────────┘ │ │ +│ │ │ │ +│ │ │ │ │ +│ │ ▼ │ │ +│ │ │ │ +│ │ Feedback Actions: │ │ +│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ +│ │ │ Update │ │ Adjust │ │ Notify │ │ │ +│ │ │ Scoring │ │ Content │ │ Team │ │ │ +│ │ │ Weights │ │ Strategy │ │ (Slack) │ │ │ +│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ +│ │ │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +**Metrics Tracked Per Article:** + +| Category | Metrics | +|----------|---------| +| Traffic | Pageviews, unique visitors, avg time on page, bounce rate | +| SEO | Organic traffic, keyword rankings, backlinks | +| Social | Impressions, engagements, shares, clicks | +| Video | Views, completion rate, avg watch time | +| Newsletter | Open rate, click rate | +| Business | Newsletter signups, lead captures | + +--- + +### M7: Correspondent Recruiting + +**Purpose:** Find subject matter experts for stories and potential correspondents. + +**Trigger:** On demand, or when new beat identified + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ M7: CORRESPONDENT RECRUITING │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ INPUT │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ • Topic or beat to find experts for │ │ +│ │ • Expertise criteria (technical depth, communication skills, etc.) │ │ +│ │ • Geography preferences (optional) │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ DISCOVERY (Parallel, via Apify actors) │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ +│ │ │ Twitter │ │ LinkedIn │ │ GitHub │ │ Podcast │ │ │ +│ │ │ Search │ │ Search │ │ Contrib. │ │ Guests │ │ │ +│ │ │ │ │ │ │ Search │ │ │ │ │ +│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ +│ │ └─────────────┴─────────────┴─────────────┘ │ │ +│ │ │ │ │ +│ │ ▼ │ │ +│ │ ┌─────────────────┐ │ │ +│ │ │ Raw Candidate │ │ │ +│ │ │ List │ │ │ +│ │ └─────────────────┘ │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ENRICHMENT (Pure Python) │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ For each candidate: │ │ +│ │ • Follower count, engagement rate │ │ +│ │ • Recent content themes │ │ +│ │ • Bio/headline keywords │ │ +│ │ • Cross-platform presence │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ SCORING (LLMStep) │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ Score each candidate on: │ │ +│ │ • Relevance to topic/beat │ │ +│ │ • Authority (credentials, following, engagement) │ │ +│ │ • Accessibility (active on socials, responds to DMs) │ │ +│ │ • Content quality (writing samples, video presence) │ │ +│ │ • Fit with Technically's voice │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ OUTREACH PREP (LLMStep) │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ Generate personalized outreach for top candidates: │ │ +│ │ • Reference their specific work │ │ +│ │ • Explain the opportunity │ │ +│ │ • Clear CTA │ │ +│ │ │ │ +│ │ Output: Draft messages for human review before sending │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ +│ OUTPUT │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ • Ranked candidate list in `technically_experts` table │ │ +│ │ • Draft outreach messages (queued for manual send) │ │ +│ │ • Slack notification with top 10 candidates │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Micro Workflows + +### Step Type Reference + +| Type | Decorator | Use Case | Cost | +|------|-----------|----------|------| +| Pure Python | `@DBOS.step()` | API calls, transforms, I/O | Free | +| LLM Batch | `LLMStep` | Classification, scoring, generation | ~$0.01-0.05/item | +| Transaction | `@DBOS.transaction()` | Database writes | Free | +| Agent | `agent_execution_step()` | Complex reasoning, tool use | ~$0.10-1.00/run | + +### Complete Step Catalog + +#### Topic Pipeline (M1) + +| ID | Step Name | Type | Input | Output | +|----|-----------|------|-------|--------| +| T1 | `fetch_rss_step` | DBOS.step | RSS feed URLs | Raw topics | +| T2 | `fetch_reddit_step` | DBOS.step | Subreddit list | Raw topics | +| T3 | `fetch_hackernews_step` | DBOS.step | Config (timeframe, limit) | Raw topics | +| T4 | `fetch_twitter_step` | DBOS.step | Lists, hashtags, accounts | Raw topics | +| T5 | `fetch_github_trending_step` | DBOS.step | Language, timeframe | Raw topics | +| T6 | `dedupe_topics_step` | DBOS.step | Raw topics | New topics only | +| T7 | `score_topics_step` | LLMStep | Topics | Scored topics | +| T8 | `persist_topics` | DBOS.transaction | Scored topics | DB records | +| T9 | `post_slack_digest_step` | DBOS.step | Top topics | Slack message ID | +| T10 | `post_slack_alert_step` | DBOS.step | High-score topic | Slack message ID | + +#### Brief & Assignment (M2) + +| ID | Step Name | Type | Input | Output | +|----|-----------|------|-------|--------| +| B1 | `fetch_topic_step` | DBOS.step | Topic ID | Topic details | +| B2 | `research_topic_step` | Agent | Topic | Research doc | +| B3 | `find_competitors_step` | DBOS.step | Topic | Competitor coverage | +| B4 | `generate_brief_step` | LLMStep | Topic + research | Brief content | +| B5 | `create_google_doc_step` | DBOS.step | Brief | Doc URL | +| B6 | `create_asana_task_step` | DBOS.step | Brief + Doc URL | Task ID | +| B7 | `persist_brief` | DBOS.transaction | Brief data | DB record | +| B8 | `notify_writer_step` | DBOS.step | Brief + Writer | Slack DM ID | + +#### Visual Production (M3a) + +| ID | Step Name | Type | Input | Output | +|----|-----------|------|-------|--------| +| V1 | `analyze_visual_needs_step` | LLMStep | Brief | Asset requirements | +| V2 | `generate_image_prompt_step` | LLMStep | Brief/article | DALL-E prompt | +| V3 | `generate_image_step` | DBOS.step | Prompt | Raw image path | +| V4 | `apply_brand_template_step` | DBOS.step | Image | Branded image | +| V5 | `generate_mermaid_step` | LLMStep | Concept | Mermaid code | +| V6 | `render_mermaid_step` | DBOS.step | Mermaid code | Base PNG | +| V7 | `style_transfer_step` | DBOS.step | Base PNG + style | Styled PNG | +| V8 | `upload_cloudinary_step` | DBOS.step | Image + config | Cloudinary URLs | +| V9 | `persist_visual_asset` | DBOS.transaction | Asset data | DB record | + +#### Video Production (M3b) + +| ID | Step Name | Type | Input | Output | +|----|-----------|------|-------|--------| +| VD1 | `generate_script_step` | LLMStep | Article | Video script | +| VD2 | `generate_shot_list_step` | LLMStep | Script | Shot list | +| VD3 | `identify_diagram_needs_step` | DBOS.step | Shot list | Diagram concepts | +| VD4 | `schedule_recording_step` | DBOS.step | Correspondent + time | Calendar event | +| VD5 | `notify_correspondent_step` | DBOS.step | Video project | Slack/email | +| VD6 | `persist_video_project` | DBOS.transaction | Project data | DB record | +| VD7 | `ingest_footage_step` | DBOS.step | Footage URL | Descript project | +| VD8 | `upload_youtube_step` | DBOS.step | Final video | YouTube ID | + +#### Publishing (M4) + +| ID | Step Name | Type | Input | Output | +|----|-----------|------|-------|--------| +| P1 | `fetch_google_doc_step` | DBOS.step | Doc URL | Raw content | +| P2 | `transform_to_markdown_step` | DBOS.step | Raw content | Clean markdown | +| P3 | `generate_seo_metadata_step` | LLMStep | Article | Meta title, desc, schema | +| P4 | `attach_assets_step` | DBOS.step | Brief ID | Asset URLs | +| P5 | `create_cms_draft_step` | DBOS.step | Content + assets | CMS ID + preview URL | +| P6 | `schedule_publish_step` | DBOS.step | CMS ID + time | Scheduled | +| P7 | `persist_article` | DBOS.transaction | Article data | DB record | +| P8 | `notify_editor_step` | DBOS.step | Article + preview | Slack message | + +#### Distribution (M5) + +| ID | Step Name | Type | Input | Output | +|----|-----------|------|-------|--------| +| D1 | `generate_twitter_thread_step` | LLMStep | Article | Thread tweets | +| D2 | `generate_linkedin_post_step` | LLMStep | Article | Post content | +| D3 | `generate_newsletter_entry_step` | LLMStep | Article | Newsletter blurb | +| D4 | `post_twitter_step` | DBOS.step | Thread | Tweet IDs | +| D5 | `post_linkedin_step` | DBOS.step | Post | Post ID | +| D6 | `queue_newsletter_step` | DBOS.step | Entry | Queue ID | +| D7 | `post_community_step` | DBOS.step | Summary | Post IDs | +| D8 | `schedule_reminder_step` | DBOS.step | Article | Scheduled job ID | +| D9 | `persist_social_posts` | DBOS.transaction | Post data | DB records | + +#### Analytics (M6) + +| ID | Step Name | Type | Input | Output | +|----|-----------|------|-------|--------| +| A1 | `sync_ga4_step` | DBOS.step | Date range | Traffic metrics | +| A2 | `sync_search_console_step` | DBOS.step | Date range | SEO metrics | +| A3 | `sync_twitter_metrics_step` | DBOS.step | Post IDs | Engagement data | +| A4 | `sync_linkedin_metrics_step` | DBOS.step | Post IDs | Engagement data | +| A5 | `sync_newsletter_metrics_step` | DBOS.step | Campaign IDs | Email metrics | +| A6 | `aggregate_performance_step` | DBOS.step | All metrics | Per-article scores | +| A7 | `analyze_patterns_step` | LLMStep | Performance data | Insights | +| A8 | `generate_weekly_report_step` | LLMStep | Analysis | Report markdown | +| A9 | `update_scoring_weights_step` | DBOS.step | Performance vs predictions | New weights | +| A10 | `persist_metrics` | DBOS.transaction | Metrics | DB records | +| A11 | `notify_team_step` | DBOS.step | Report | Slack message | + +#### Recruiting (M7) + +| ID | Step Name | Type | Input | Output | +|----|-----------|------|-------|--------| +| R1 | `identify_expertise_step` | LLMStep | Topic/beat | Search criteria | +| R2 | `search_twitter_step` | DBOS.step | Criteria | Raw candidates | +| R3 | `search_linkedin_step` | DBOS.step | Criteria | Raw candidates | +| R4 | `search_github_step` | DBOS.step | Criteria | Raw candidates | +| R5 | `enrich_profiles_step` | DBOS.step | Candidates | Enriched profiles | +| R6 | `score_candidates_step` | LLMStep | Profiles | Scored candidates | +| R7 | `generate_outreach_step` | LLMStep | Top candidates | Draft messages | +| R8 | `persist_experts` | DBOS.transaction | Candidate data | DB records | +| R9 | `notify_recruiting_step` | DBOS.step | Top candidates | Slack message | + +--- + +## Database Models + +### Entity Relationship Overview + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ DATABASE SCHEMA │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ +│ │ TopicSource │ │ Topic │ │ Brief │ │ +│ │ │────────▶│ │────────▶│ │ │ +│ │ • RSS feeds │ has │ • title │ becomes │ • headlines │ │ +│ │ • Subreddits │ many │ • scores │ │ • research │ │ +│ │ • Twitter lists│ │ • status │ │ • sources │ │ +│ └─────────────────┘ └────────┬────────┘ └────────┬────────┘ │ +│ │ │ │ +│ │ │ │ +│ ┌────────▼────────┐ ┌────────▼────────┐ │ +│ │ Writer │ │ VisualAsset │ │ +│ │ │◀────────│ │ │ +│ │ • name │ assigned│ • type │ │ +│ │ • beats │ to │ • cloudinary │ │ +│ │ • capacity │ │ • mermaid_code │ │ +│ └─────────────────┘ └─────────────────┘ │ +│ │ │ +│ ┌────────▼────────┐ │ +│ │ VideoProject │ │ +│ │ │ │ +│ │ • script │ │ +│ │ • shot_list │ │ +│ │ • correspondent│ │ +│ └────────┬────────┘ │ +│ │ │ +│ ┌─────────────────┐ ┌─────────────────┐ ┌───────▼─────────┐ │ +│ │ SocialPost │◀────────│ Article │◀────────│ │ │ +│ │ │ has │ │ produces│ (Brief) │ │ +│ │ • platform │ many │ • content │ │ │ │ +│ │ • external_id │ │ • cms_id │ │ │ │ +│ │ • metrics │ │ • published_at │ │ │ │ +│ └─────────────────┘ └────────┬────────┘ └─────────────────┘ │ +│ │ │ +│ ┌────────▼────────┐ │ +│ │ ArticleMetrics │ │ +│ │ │ │ +│ │ • traffic │ │ +│ │ • social │ │ +│ │ • seo │ │ +│ └─────────────────┘ │ +│ │ +│ ┌─────────────────┐ │ +│ │ Expert │ (Standalone - for recruiting) │ +│ │ │ │ +│ │ • name │ │ +│ │ • platforms │ │ +│ │ • score │ │ +│ │ • outreach │ │ +│ └─────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +### Table Specifications + +#### `technically_topics` + +| Column | Type | Description | +|--------|------|-------------| +| id | int | Primary key | +| topic_id | str | Unique identifier (UUID) | +| source_type | str | rss, reddit, hackernews, twitter, github, manual | +| source_url | str | Original URL | +| source_name | str | Feed name, subreddit, etc. | +| title | str | Topic title | +| description | str | Summary/snippet | +| score_relevance | int | 0-100 | +| score_timeliness | int | 0-100 | +| score_uniqueness | int | 0-100 | +| score_audience_fit | int | 0-100 | +| score_competition | int | 0-100 | +| score_total | float | Weighted composite | +| scoring_reasoning | str | LLM explanation | +| status | enum | discovered, scored, approved, rejected, assigned, published | +| external_score | int | Upvotes, likes from source | +| tags_json | list | Topic tags | +| beat | str | AI, Cloud, Startups, etc. | +| reviewed_by | str | Editor who approved/rejected | +| reviewed_at | datetime | When reviewed | +| assigned_writer_id | str | FK to Writer | +| brief_id | str | FK to Brief | +| created_at | datetime | | +| updated_at | datetime | | + +#### `technically_topic_sources` + +| Column | Type | Description | +|--------|------|-------------| +| id | int | Primary key | +| source_id | str | Unique identifier | +| source_type | str | rss, reddit, hackernews, twitter | +| source_url | str | RSS URL, subreddit name, etc. | +| source_name | str | Human-readable name | +| enabled | bool | Is this source active? | +| check_interval_minutes | int | How often to check | +| last_checked_at | datetime | | +| include_keywords_json | list | Keywords to match | +| exclude_keywords_json | list | Keywords to filter out | +| min_external_score | int | Minimum upvotes/likes | +| default_beat | str | Default beat assignment | + +#### `technically_briefs` + +| Column | Type | Description | +|--------|------|-------------| +| id | int | Primary key | +| brief_id | str | Unique identifier | +| topic_id | str | FK to Topic | +| title | str | Working title | +| headline_options_json | list | 3 headline suggestions | +| story_angle | str | Unique angle | +| hook | str | Opening hook | +| key_facts_json | list | Facts with sources | +| source_links_json | list | {url, title, type} | +| competitor_coverage_json | list | What competitors wrote | +| interview_contacts_json | list | Potential sources | +| target_keywords_json | list | SEO keywords | +| suggested_word_count | int | Target length | +| content_format | str | blog_post, deep_dive, news, tutorial | +| visual_suggestions_json | list | Image/diagram ideas | +| requires_diagrams | bool | | +| requires_video | bool | | +| status | enum | draft, ready, assigned, in_progress, submitted, in_review, approved, published | +| assigned_writer_id | str | FK to Writer | +| assigned_editor_id | str | | +| google_doc_url | str | | +| asana_task_id | str | | +| due_date | datetime | | +| created_at | datetime | | +| updated_at | datetime | | + +#### `technically_writers` + +| Column | Type | Description | +|--------|------|-------------| +| id | int | Primary key | +| writer_id | str | Unique identifier | +| name | str | | +| email | str | | +| slack_user_id | str | For notifications | +| beats_json | list | ["AI", "Cloud"] | +| can_record_video | bool | Can be correspondent | +| max_weekly_assignments | int | Capacity | +| current_assignments | int | Active count | +| is_active | bool | | + +#### `technically_visual_assets` + +| Column | Type | Description | +|--------|------|-------------| +| id | int | Primary key | +| asset_id | str | Unique identifier | +| brief_id | str | FK to Brief | +| asset_type | enum | featured_image, hero, thumbnail, diagram, chart, social_card, video_thumbnail | +| status | enum | pending, generating, review, approved, rejected, uploaded | +| prompt | str | Generation prompt | +| mermaid_code | str | For diagrams | +| style_reference | str | Style prompt for Nano Banana | +| local_path | str | Temp path | +| cloudinary_url | str | Final URL | +| cloudinary_public_id | str | For management | +| variants_json | dict | {size_name: url} | +| width | int | | +| height | int | | +| alt_text | str | Accessibility | +| created_at | datetime | | + +#### `technically_video_projects` + +| Column | Type | Description | +|--------|------|-------------| +| id | int | Primary key | +| video_id | str | Unique identifier | +| brief_id | str | FK to Brief | +| script_content | str | Full script | +| script_approved | bool | | +| estimated_duration_seconds | int | | +| shot_list_json | list | Shot breakdown | +| diagram_asset_ids_json | list | FKs to VisualAsset | +| correspondent_id | str | FK to Writer | +| recording_scheduled_at | datetime | | +| recording_completed_at | datetime | | +| raw_footage_url | str | Cloud storage | +| edit_project_url | str | Descript link | +| status | enum | pending, scripted, scheduled, recorded, editing, review, approved, published | +| final_video_url | str | | +| youtube_video_id | str | | +| youtube_url | str | | +| created_at | datetime | | + +#### `technically_articles` + +| Column | Type | Description | +|--------|------|-------------| +| id | int | Primary key | +| article_id | str | Unique identifier | +| brief_id | str | FK to Brief | +| title | str | Final title | +| slug | str | URL slug | +| content_markdown | str | Full content | +| excerpt | str | Summary | +| meta_title | str | SEO title | +| meta_description | str | SEO description | +| canonical_url | str | | +| keywords_json | list | | +| categories_json | list | | +| tags_json | list | | +| featured_image_asset_id | str | FK to VisualAsset | +| video_id | str | FK to VideoProject | +| author_id | str | FK to Writer | +| author_name | str | Display name | +| status | enum | draft, scheduled, published, unpublished | +| cms_id | str | Sanity document ID | +| cms_last_synced_at | datetime | | +| scheduled_for | datetime | | +| published_at | datetime | | +| created_at | datetime | | + +#### `technically_social_posts` + +| Column | Type | Description | +|--------|------|-------------| +| id | int | Primary key | +| post_id | str | Unique identifier | +| article_id | str | FK to Article | +| platform | str | twitter, linkedin, threads | +| content | str | Post content | +| thread_json | list | For Twitter threads | +| status | enum | draft, scheduled, posted, failed | +| scheduled_for | datetime | | +| posted_at | datetime | | +| external_post_id | str | Platform's ID | +| external_url | str | Link to post | +| impressions | int | | +| engagements | int | | +| clicks | int | | +| created_at | datetime | | + +#### `technically_article_metrics` + +| Column | Type | Description | +|--------|------|-------------| +| id | int | Primary key | +| article_id | str | FK to Article | +| date | date | Metrics date | +| pageviews | int | | +| unique_visitors | int | | +| avg_time_on_page | float | seconds | +| bounce_rate | float | | +| organic_traffic | int | | +| social_traffic | int | | +| referral_traffic | int | | +| keyword_rankings_json | list | {keyword, position} | +| backlinks | int | | +| social_shares | int | | +| social_engagements | int | | +| newsletter_clicks | int | | +| video_views | int | | +| video_completion_rate | float | | + +#### `technically_experts` + +| Column | Type | Description | +|--------|------|-------------| +| id | int | Primary key | +| expert_id | str | Unique identifier | +| name | str | | +| title | str | Job title | +| company | str | | +| bio | str | | +| twitter_handle | str | | +| twitter_followers | int | | +| linkedin_url | str | | +| github_username | str | | +| email | str | If known | +| expertise_areas_json | list | | +| score_relevance | int | | +| score_authority | int | | +| score_accessibility | int | | +| score_total | float | | +| outreach_status | enum | none, drafted, sent, responded, converted | +| outreach_draft | str | | +| notes | str | | +| source_campaign | str | Which search found them | +| created_at | datetime | | +| updated_at | datetime | | + +--- + +## Integration Requirements + +### Extensions to Kurt's Existing Integrations + +Kurt already has `kurt integrations research` with Reddit, HN, RSS, and Perplexity. Extend with: + +| Integration | Purpose | API/Method | Priority | +|-------------|---------|------------|----------| +| **Twitter/X Signals** | Topic discovery from tech Twitter | Twitter API v2 or Apify | P1 | +| **GitHub Trending** | Trending repos, releases | GitHub API | P2 | +| **Google Alerts** | Keyword monitoring | Email parsing or RSS | P3 | + +### New Integrations Required + +#### Phase 1: Core Pipeline (Weeks 1-4) + +| Integration | Purpose | API/Method | +|-------------|---------|------------| +| **Slack** | Notifications, interactive approvals, digests | Slack SDK (slack_sdk) | +| **Google Docs** | Brief templates, fetch content | Google Docs API | +| **Asana** | Task management, status sync | Asana API | +| **Cloudinary** | Image/video asset storage, transforms | Cloudinary SDK | + +#### Phase 2: Production (Weeks 5-8) + +| Integration | Purpose | API/Method | +|-------------|---------|------------| +| **DALL-E** | Featured image generation | OpenAI Images API | +| **Mermaid CLI** | Diagram rendering | Local CLI (mmdc) | +| **Nano Banana 3** | Diagram style transfer | API (TBD) | +| **Google Calendar** | Correspondent scheduling | Google Calendar API | +| **Descript** | Video editing, transcription | Descript API | + +#### Phase 3: Distribution (Weeks 9-10) + +| Integration | Purpose | API/Method | +|-------------|---------|------------| +| **Twitter API** | Post threads, read engagement | Twitter API v2 | +| **LinkedIn API** | Post updates, read engagement | LinkedIn Marketing API | +| **Beehiiv** | Newsletter integration | Beehiiv API | +| **YouTube** | Video upload, metadata | YouTube Data API | + +#### Phase 4: Analytics & Recruiting (Weeks 11-12) + +| Integration | Purpose | API/Method | +|-------------|---------|------------| +| **GA4** | Traffic analytics | Google Analytics Data API | +| **Search Console** | SEO metrics | Search Console API | +| **Apify** | Social scraping for expert discovery | Apify API | + +### Integration Architecture + +Each integration follows kurt's pattern: + +``` +extensions/integrations/{name}/ +├── __init__.py # Exports client class +├── client.py # API client implementation +├── config.py # Configuration schema +├── cli.py # Optional CLI commands +└── tests/ + └── test_{name}.py +``` + +**Example: Slack Integration** + +```python +# extensions/integrations/slack/client.py + +class SlackClient: + """Slack integration for newsroom notifications.""" + + def __init__(self, token: str = None): + self.client = WebClient(token=token or os.environ["SLACK_BOT_TOKEN"]) + + def post_topic_digest(self, topics: list[dict], channel: str) -> str: + """Post daily digest with interactive buttons.""" + ... + + def post_high_score_alert(self, topic: dict, channel: str) -> str: + """Post immediate alert for high-scoring topic.""" + ... + + def notify_writer(self, writer_slack_id: str, brief: dict) -> str: + """DM writer about new assignment.""" + ... + + def handle_topic_approval(self, payload: dict) -> dict: + """Handle interactive button callback.""" + ... +``` + +--- + +## CLI Commands + +### Topic Management + +```bash +# Ingestion +kurt topics ingest # Run ingestion now +kurt topics ingest --source reddit # Specific source only +kurt topics ingest --background # Run in background +kurt topics ingest --dry-run # Preview without saving + +# Listing and filtering +kurt topics list # All topics +kurt topics list --status approved # By status +kurt topics list --score-min 80 # By score +kurt topics list --beat "AI" # By beat +kurt topics list --since "2024-01-01" # By date + +# Actions +kurt topics show # Full details +kurt topics approve # Approve topic +kurt topics approve --assign # Approve and assign +kurt topics reject --reason "..." # Reject with reason +kurt topics score # Re-score a topic + +# Sources +kurt topics sources list # List configured sources +kurt topics sources add --type rss --url "..." --name "..." +kurt topics sources disable +``` + +### Brief Management + +```bash +# Generation +kurt briefs generate # Generate brief +kurt briefs generate --writer # Generate and assign +kurt briefs generate --skip-research # Use existing research + +# Listing +kurt briefs list # All briefs +kurt briefs list --status in_progress # By status +kurt briefs list --writer # By writer + +# Actions +kurt briefs show # Full details +kurt briefs assign --writer # Assign to writer +kurt briefs update-status --status submitted +``` + +### Production + +```bash +# Images +kurt production image # Generate featured image +kurt production image --type diagram --concept "..." +kurt production image --regenerate # Try again + +# Diagrams (Mermaid → Nano Banana pipeline) +kurt production diagram --concept "API request flow" +kurt production diagram --mermaid-file diagram.mmd + +# Video +kurt production video prepare # Full pre-production +kurt production video script # Script only +kurt production video schedule --correspondent --time "..." + +# Assets +kurt production assets list # List all assets +kurt production assets upload --file local.png --type diagram +``` + +### Publishing + +```bash +# CMS +kurt publish sync # Push to CMS +kurt publish preview # Get preview URL +kurt publish schedule --time "2024-01-20T09:00:00" +kurt publish now # Publish immediately + +# Distribution +kurt publish distribute # Full distribution +kurt publish distribute --platforms twitter,linkedin +kurt publish twitter # Twitter only +kurt publish linkedin # LinkedIn only +``` + +### Analytics + +```bash +# Sync +kurt analytics sync # Sync all sources +kurt analytics sync --source ga4 # Specific source +kurt analytics sync --since "2024-01-01" # Date range + +# Reports +kurt analytics report # Weekly report +kurt analytics report --period month # Monthly +kurt analytics report --article # Single article + +# Performance +kurt analytics top --period week --limit 10 # Top performers +kurt analytics underperforming # Articles below expectations +``` + +### Recruiting + +```bash +# Discovery +kurt experts find --topic "AI agents" --platforms twitter,linkedin +kurt experts find --beat "Cloud" --limit 50 + +# Management +kurt experts list # All experts +kurt experts list --status drafted # By outreach status +kurt experts show # Full profile + +# Outreach +kurt experts draft-outreach # Generate outreach +kurt experts mark-sent # Mark as sent +kurt experts mark-responded # Mark as responded +``` + +--- + +## Technical Architecture + +### Project Structure + +``` +technically/ +├── kurt.config # Kurt configuration +├── .kurt/ +│ └── kurt.sqlite # Database +├── .agents/ +│ └── AGENTS.md # Agent instructions +├── workflows/ # Agent workflows (.md) +│ ├── topic-research.md +│ ├── competitor-analysis.md +│ └── expert-outreach.md +│ +├── extensions/ # Python extensions +│ ├── models/ +│ │ ├── __init__.py +│ │ ├── topics.py +│ │ ├── briefs.py +│ │ ├── production.py +│ │ ├── publishing.py +│ │ └── recruiting.py +│ │ +│ ├── workflows/ +│ │ ├── topics/ +│ │ │ ├── config.py +│ │ │ ├── steps.py +│ │ │ └── workflow.py +│ │ ├── briefs/ +│ │ ├── production/ +│ │ ├── publishing/ +│ │ ├── distribution/ +│ │ ├── analytics/ +│ │ └── recruiting/ +│ │ +│ └── integrations/ +│ ├── slack/ +│ ├── google_docs/ +│ ├── asana/ +│ ├── cloudinary/ +│ ├── dalle/ +│ ├── mermaid/ +│ ├── nano_banana/ +│ ├── twitter/ +│ ├── linkedin/ +│ ├── youtube/ +│ ├── ga4/ +│ ├── search_console/ +│ └── apify/ +│ +├── sources/ # Fetched content (kurt default) +└── projects/ # Writing projects (kurt default) +``` + +### Workflow Execution Model + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ EXECUTION MODEL │ +├─────────────────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ DBOS WORKFLOW │ │ +│ │ │ │ +│ │ @DBOS.workflow() │ │ +│ │ def topic_ingestion_workflow(config): │ │ +│ │ │ │ │ +│ │ ├── fetch_rss_step() # DBOS.step - Pure Python │ │ +│ │ ├── fetch_reddit_step() # DBOS.step - Pure Python │ │ +│ │ ├── dedupe_step() # DBOS.step - Pure Python │ │ +│ │ │ │ │ +│ │ ├── score_topics_step.run() # LLMStep - Batch LLM │ │ +│ │ │ └── Queue (concurrency=5) │ │ +│ │ │ ├── process_row() → LLM call │ │ +│ │ │ ├── process_row() → LLM call │ │ +│ │ │ └── ... │ │ +│ │ │ │ │ +│ │ ├── persist_topics() # DBOS.transaction - DB write │ │ +│ │ │ │ │ +│ │ └── notify_slack_step() # DBOS.step - Pure Python │ │ +│ │ │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ │ +│ │ Durability │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ POSTGRES (via DBOS) │ │ +│ │ │ │ +│ │ • Workflow state │ │ +│ │ • Step checkpoints │ │ +│ │ • Event streams │ │ +│ │ • Recovery on failure │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────────────────────────────────────┐ │ +│ │ SQLITE (kurt) │ │ +│ │ │ │ +│ │ • Domain models (topics, briefs, articles, etc.) │ │ +│ │ • LLM traces (costs, tokens) │ │ +│ │ • Integration configs │ │ +│ └─────────────────────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Implementation Phases + +### Phase 1: Foundation (Weeks 1-2) +**Goal:** Topic ingestion working end-to-end + +- [ ] Set up project structure with extensions folder +- [ ] Implement topic models +- [ ] Extend kurt signals with Twitter/GitHub +- [ ] Implement topic scoring LLMStep +- [ ] Build Slack integration (notifications, buttons) +- [ ] CLI: `kurt topics` commands + +**Deliverable:** Daily topic digest in Slack with approve/reject + +### Phase 2: Brief Pipeline (Weeks 3-4) +**Goal:** Topic → Brief → Writer assignment + +- [ ] Implement brief models +- [ ] Build research agent workflow +- [ ] Implement brief generation LLMStep +- [ ] Build Google Docs integration +- [ ] Build Asana integration +- [ ] CLI: `kurt briefs` commands + +**Deliverable:** Approved topic auto-generates brief, creates doc, assigns writer + +### Phase 3: Visual Production (Weeks 5-6) +**Goal:** Automated image and diagram generation + +- [ ] Implement visual asset models +- [ ] Build Cloudinary integration +- [ ] Build DALL-E integration +- [ ] Build Mermaid CLI integration +- [ ] Build Nano Banana 3 integration +- [ ] CLI: `kurt production image/diagram` commands + +**Deliverable:** Brief triggers asset generation, uploads to DAM + +### Phase 4: Video Pipeline (Weeks 7-8) +**Goal:** Video pre-production automated + +- [ ] Implement video project models +- [ ] Build script generation LLMStep +- [ ] Build shot list generation +- [ ] Build Google Calendar integration +- [ ] Build correspondent notification flow +- [ ] CLI: `kurt production video` commands + +**Deliverable:** Brief with video flag triggers full pre-prod package + +### Phase 5: Publishing (Weeks 9-10) +**Goal:** Draft → CMS → Publish + +- [ ] Implement article models +- [ ] Extend kurt's Sanity integration +- [ ] Build SEO metadata generation +- [ ] Build publishing workflow +- [ ] CLI: `kurt publish` commands + +**Deliverable:** Approved content auto-syncs to CMS, schedules publish + +### Phase 6: Distribution (Weeks 11-12) +**Goal:** Automated social amplification + +- [ ] Implement social post models +- [ ] Build Twitter integration +- [ ] Build LinkedIn integration +- [ ] Build newsletter integration +- [ ] Build distribution workflow +- [ ] CLI: `kurt publish distribute` commands + +**Deliverable:** Published article triggers coordinated social distribution + +### Phase 7: Analytics & Recruiting (Weeks 13-14) +**Goal:** Performance tracking and feedback loop + +- [ ] Implement metrics models +- [ ] Build GA4 integration +- [ ] Build Search Console integration +- [ ] Build weekly report workflow +- [ ] Implement expert models +- [ ] Build Apify integration for social scraping +- [ ] Build recruiting workflow +- [ ] CLI: `kurt analytics` and `kurt experts` commands + +**Deliverable:** Weekly performance reports, expert discovery pipeline + +--- + +## Success Metrics + +| Metric | Target | Measurement | +|--------|--------|-------------| +| Topics surfaced per day | 50-100 | Count in DB | +| Topic → Brief time | < 2 hours | Workflow duration | +| Brief → Published time | < 5 days | Status timestamps | +| Visual asset generation | < 5 min | Workflow duration | +| Distribution coverage | 100% of articles | Social posts created | +| Scoring accuracy | 70%+ correlation | Predicted vs actual traffic | +| Cost per article | < $5 in LLM | LLM traces sum | + +--- + +## Open Questions + +1. **Nano Banana 3 API** — What's the exact API interface for style transfer? +2. **Video editing** — Descript API vs. manual editing workflow? +3. **Newsletter platform** — Beehiiv, Substack, or custom? +4. **Expert outreach** — Automated DMs or manual send? +5. **Multi-tenant** — Single workspace or support for multiple publications? + +--- + +*Document Version: 1.0* +*Last Updated: January 2026* +*For: Technically Newsroom Project*