diff --git a/openkiln/commands/skill.py b/openkiln/commands/skill.py index 3e8aa9a..dc3d1a8 100644 --- a/openkiln/commands/skill.py +++ b/openkiln/commands/skill.py @@ -291,6 +291,7 @@ def _append_config_section(skill_name: str) -> None: config_templates: dict[str, str] = { "orbisearch": ('\n[skills.orbisearch]\napi_key = "" # get your free key at orbisearch.com\n'), "smartlead": ('\n[skills.smartlead]\napi_key = ""\n'), + "cleanco": ('\n[skills.cleanco]\napi_key = "" # OpenAI API key\n'), } template = config_templates.get(skill_name) diff --git a/openkiln/skills/cleanco/SKILL.md b/openkiln/skills/cleanco/SKILL.md new file mode 100644 index 0000000..6e431b1 --- /dev/null +++ b/openkiln/skills/cleanco/SKILL.md @@ -0,0 +1,117 @@ +# Cleanco Skill + +Clean company names for outreach using OpenAI. + +## Provides + +| Type | Name | Description | +|-----------|-----------------------|---------------------------------------| +| transform | cleanco.company_name | Clean company names for cold email | + +## Required config +```bash +# Set via environment variable (recommended) +export OPENAI_API_KEY=your-key-here + +# Or via config file (~/.openkiln/config.toml) +[skills.cleanco] +api_key = "your-key-here" +``` + +## What it cleans + +- Legal suffixes: Inc, LLC, Ltd, GmbH, Corp, PLC, etc. +- Parenthetical descriptions: "Tiger Data (creators of TimescaleDB)" -> "Tiger Data" +- Pipe-separated taglines: "Rocketship | Digital Marketing Agency" -> "Rocketship" +- Colon-separated taglines: "eyreACT: AI Act Compliance Platform" -> "eyreACT" +- Preserves brand names: "1Password", "6sense" stay unchanged + +Uses gpt-4o-mini for intelligent cleaning. Results are cached in +cleanco.db so each unique name is only cleaned once. + +## CLI Commands + +### clean + +Clean company names in a CSV file. + +```bash +# dry run — shows what would be cleaned +openkiln cleanco clean contacts.csv + +# clean and write output (defaults to contacts-cleaned.csv) +openkiln cleanco clean contacts.csv --apply + +# specify column and output path +openkiln cleanco clean contacts.csv --column company_name --output cleaned.csv --apply + +# JSON output +openkiln cleanco clean contacts.csv --apply --json +``` + +**Flags:** +- `--column`, `-c` — Column name containing company names (default: `company_name`) +- `--output`, `-o` — Output file path (default: `-cleaned.csv`) +- `--apply` — Actually clean and write output (default: dry run) +- `--json` — Output as JSON + +### cache + +Show cache statistics. + +```bash +openkiln cleanco cache +openkiln cleanco cache --json +``` + +### show + +Show cached name changes (where original differs from cleaned). + +```bash +openkiln cleanco show +openkiln cleanco show --limit 50 +openkiln cleanco show --json +``` + +**Flags:** +- `--limit`, `-n` — Number of entries to show (default: 20) +- `--json` — Output as JSON + +## Example workflow usage + +### Pre-import (CLI) +```bash +openkiln cleanco clean contacts.csv --apply +openkiln record import contacts-cleaned.csv --type contact --skill crm --apply +``` + +### Post-import (workflow) +```yaml +name: clean-validate-push +requires: + - crm + - cleanco + - orbisearch + - smartlead + +source: + skill: crm + type: contacts + filter: + segment: clay-gtm-ops + +transforms: + - cleanco.company_name + - orbisearch.validate + +filter: + status: safe + +sinks: + - skill: crm + action: update + - skill: smartlead + action: push + campaign_id: "3133669" +``` diff --git a/openkiln/skills/cleanco/__init__.py b/openkiln/skills/cleanco/__init__.py new file mode 100644 index 0000000..60c1aa4 --- /dev/null +++ b/openkiln/skills/cleanco/__init__.py @@ -0,0 +1,4 @@ +# openkiln/skills/cleanco/ +# Company name cleaning via OpenAI. + +__version__ = "0.1.0" diff --git a/openkiln/skills/cleanco/api.py b/openkiln/skills/cleanco/api.py new file mode 100644 index 0000000..e2eab0b --- /dev/null +++ b/openkiln/skills/cleanco/api.py @@ -0,0 +1,126 @@ +""" +Company name cleaning via OpenAI. + +Uses gpt-4o-mini to clean company names for cold email outreach. +Batches names for efficiency. +""" + +from __future__ import annotations + +import json +import os + +import httpx + +from openkiln import config + +BASE_URL = "https://api.openai.com/v1" +REQUEST_TIMEOUT = 30.0 +MODEL = "gpt-4o-mini" + +SYSTEM_PROMPT = """\ +You clean company names for use in cold email outreach. + +Rules: +- Remove legal suffixes: Inc, Inc., LLC, Ltd, Ltd., Limited, GmbH, Corp, \ +Corp., Corporation, PLC, AG, SA, SAS, BV, NV, Pty, Co., Company, Group +- Remove parenthetical descriptions: "Tiger Data (creators of TimescaleDB)" -> "Tiger Data" +- Remove pipe-separated taglines: "Rocketship | Digital Marketing Agency" -> "Rocketship" +- Remove colon-separated taglines: "eyreACT: AI Act Compliance Platform" -> "eyreACT" +- Keep the core brand name as it would appear in casual business conversation +- Preserve capitalisation and special characters that are part of the brand +- If the entire name IS the brand (e.g. "1Password", "6sense"), return it unchanged +- If removing a suffix leaves nothing meaningful, keep the original + +Return a JSON array of cleaned names in the same order as the input. +No explanations, just the JSON array.\ +""" + + +class CleancoError(Exception): + """Base error for cleanco failures.""" + + def __init__(self, message: str, status_code: int | None = None) -> None: + self.status_code = status_code + super().__init__(message) + + +class CleancoClient: + """Cleans company names via OpenAI.""" + + def __init__(self, api_key: str) -> None: + self._api_key = api_key + + def clean_batch(self, names: list[str]) -> list[str]: + """Clean a batch of company names. Returns cleaned names in order.""" + if not names: + return [] + + user_msg = json.dumps(names) + + response = httpx.post( + f"{BASE_URL}/chat/completions", + headers={ + "Authorization": f"Bearer {self._api_key}", + "Content-Type": "application/json", + }, + json={ + "model": MODEL, + "messages": [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": user_msg}, + ], + "temperature": 0, + }, + timeout=REQUEST_TIMEOUT, + ) + + if response.status_code >= 400: + raise CleancoError( + f"OpenAI API error: {response.text[:200]}", + response.status_code, + ) + + data = response.json() + content = data["choices"][0]["message"]["content"].strip() + + # Parse the JSON array from the response + # Strip markdown code fences if present + if content.startswith("```"): + content = content.split("\n", 1)[1] + content = content.rsplit("```", 1)[0].strip() + + try: + cleaned = json.loads(content) + except json.JSONDecodeError: + raise CleancoError(f"Could not parse OpenAI response: {content[:200]}") + + if not isinstance(cleaned, list) or len(cleaned) != len(names): + got = len(cleaned) if isinstance(cleaned, list) else "non-list" + raise CleancoError(f"Expected {len(names)} names, got {got}") + + return cleaned + + +def _resolve_api_key() -> str: + """Resolve API key from environment or config.""" + key = os.environ.get("OPENAI_API_KEY") + if key: + return key + + cfg = config.get() + key = cfg.skill_config("cleanco").get("api_key", "") + if key: + return key + + raise CleancoError( + "No OpenAI API key configured.\n" + "Set OPENAI_API_KEY or add it to ~/.openkiln/config.toml:\n" + " [skills.cleanco]\n" + ' api_key = "your-key-here"' + ) + + +def get_client() -> CleancoClient: + """Returns a client using the configured API key.""" + return CleancoClient(_resolve_api_key()) diff --git a/openkiln/skills/cleanco/cli.py b/openkiln/skills/cleanco/cli.py new file mode 100644 index 0000000..5ef22fc --- /dev/null +++ b/openkiln/skills/cleanco/cli.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +import csv +import json +from pathlib import Path + +import typer +from rich import print as rprint +from rich.console import Console + +from openkiln.skills.cleanco import queries +from openkiln.skills.cleanco.api import CleancoError, get_client + +app = typer.Typer( + name="cleanco", + help="Cleanco -- clean company names for outreach.", + no_args_is_help=True, +) + +console = Console() + +BATCH_SIZE = 50 + + +def _handle_api_error(e: CleancoError) -> None: + """Print an API error and exit.""" + rprint(f"[red]\u2717 {e}[/red]") + raise typer.Exit(code=1) + + +@app.command("clean") +def clean( + file: Path = typer.Argument( + ..., + help="CSV file to clean.", + exists=True, + readable=True, + ), + column: str = typer.Option( + "company_name", + "--column", + "-c", + help="Column name containing company names.", + ), + output: Path = typer.Option( + None, + "--output", + "-o", + help="Output file path. Defaults to -cleaned.csv.", + ), + apply: bool = typer.Option( + False, + "--apply", + help="Actually write the output file. Default is dry run.", + ), + output_json: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Clean company names in a CSV file. + + Reads the CSV, cleans the specified column using OpenAI, + and writes the result. Uses a local cache to avoid re-cleaning + names already processed. + """ + with open(file, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + if column not in (reader.fieldnames or []): + rprint(f"[red]\u2717 Column '{column}' not found in {file}[/red]") + rprint(f" Available columns: {', '.join(reader.fieldnames or [])}") + raise typer.Exit(code=1) + rows = list(reader) + fieldnames = reader.fieldnames + + # Collect unique names + all_names = list(dict.fromkeys(r[column].strip() for r in rows if r.get(column, "").strip())) + + # Check cache + cached = queries.get_cached(all_names) + uncached = [n for n in all_names if n not in cached] + + console.print(f" Total rows: {len(rows):>6}") + console.print(f" Unique names: {len(all_names):>6}") + console.print(f" Cached: {len(cached):>6}") + console.print(f" To clean: {len(uncached):>6}") + + if not apply: + console.print("\n [yellow]Dry run.[/yellow] Use --apply to clean and write output.") + if output_json: + typer.echo( + json.dumps( + { + "rows": len(rows), + "unique_names": len(all_names), + "cached": len(cached), + "to_clean": len(uncached), + }, + indent=2, + ) + ) + return + + # Clean uncached names via API + new_mappings: dict[str, str] = {} + if uncached: + try: + client = get_client() + except CleancoError as e: + _handle_api_error(e) + + for i in range(0, len(uncached), BATCH_SIZE): + batch = uncached[i : i + BATCH_SIZE] + try: + cleaned = client.clean_batch(batch) + except CleancoError as e: + _handle_api_error(e) + for orig, clean in zip(batch, cleaned): + new_mappings[orig] = clean + console.print(f" Cleaned {min(i + BATCH_SIZE, len(uncached))}/{len(uncached)}...") + + queries.cache_results(new_mappings) + + # Merge all mappings + all_mappings = {**cached, **new_mappings} + + # Apply to rows + changed = 0 + for row in rows: + name = row.get(column, "").strip() + if name and name in all_mappings and all_mappings[name] != name: + row[column] = all_mappings[name] + changed += 1 + + # Write output + out_path = output or file.with_stem(f"{file.stem}-cleaned") + with open(out_path, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + console.print(f"\n [green]\u2713[/green] Cleaned {changed} names") + console.print(f" Output: {out_path}") + + if output_json: + typer.echo( + json.dumps( + { + "rows": len(rows), + "cleaned": changed, + "unchanged": len(rows) - changed, + "output": str(out_path), + }, + indent=2, + ) + ) + + +@app.command("cache") +def cache_stats( + output_json: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Show cache statistics.""" + import sqlite3 + + from openkiln import config + + db_path = config.get().skill_db_path("cleanco") + conn = sqlite3.connect(db_path) + try: + count = conn.execute("SELECT COUNT(*) FROM cleaned_names").fetchone()[0] + changed = conn.execute( + "SELECT COUNT(*) FROM cleaned_names WHERE original != cleaned" + ).fetchone()[0] + finally: + conn.close() + + if output_json: + typer.echo(json.dumps({"cached": count, "changed": changed}, indent=2)) + return + + console.print(f" Cached names: {count}") + console.print(f" Were changed: {changed}") + console.print(f" Unchanged: {count - changed}") + + +@app.command("show") +def show_changes( + limit: int = typer.Option(20, "--limit", "-n", help="Number of entries."), + output_json: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Show cached name changes (where original differs from cleaned).""" + import sqlite3 + + from openkiln import config + + db_path = config.get().skill_db_path("cleanco") + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + "SELECT original, cleaned FROM cleaned_names " + "WHERE original != cleaned ORDER BY cleaned_at DESC LIMIT ?", + (limit,), + ).fetchall() + finally: + conn.close() + + if output_json: + typer.echo( + json.dumps( + [{"original": r["original"], "cleaned": r["cleaned"]} for r in rows], + indent=2, + ) + ) + return + + if not rows: + console.print(" No changes recorded.") + return + + for row in rows: + console.print(f' "{row["original"]}" [dim]\u2192[/dim] "{row["cleaned"]}"') diff --git a/openkiln/skills/cleanco/queries.py b/openkiln/skills/cleanco/queries.py new file mode 100644 index 0000000..809d762 --- /dev/null +++ b/openkiln/skills/cleanco/queries.py @@ -0,0 +1,56 @@ +""" +Cleanco database queries. + +Caches cleaned company names to avoid redundant API calls. +""" + +from __future__ import annotations + +import sqlite3 + +from openkiln import config + + +def _connection() -> sqlite3.Connection: + """Opens a connection to cleanco.db.""" + db_path = config.get().skill_db_path("cleanco") + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + return conn + + +def get_cached(names: list[str]) -> dict[str, str]: + """Look up cached cleaned names. Returns {original: cleaned}.""" + if not names: + return {} + conn = _connection() + try: + placeholders = ",".join("?" for _ in names) + rows = conn.execute( + f"SELECT original, cleaned FROM cleaned_names WHERE original IN ({placeholders})", + names, + ).fetchall() + return {row["original"]: row["cleaned"] for row in rows} + finally: + conn.close() + + +def cache_results(mappings: dict[str, str]) -> None: + """Store cleaned name mappings in the cache.""" + if not mappings: + return + conn = _connection() + try: + conn.executemany( + """ + INSERT INTO cleaned_names (original, cleaned) + VALUES (?, ?) + ON CONFLICT(original) DO UPDATE SET + cleaned = excluded.cleaned, + cleaned_at = datetime('now') + """, + list(mappings.items()), + ) + conn.commit() + finally: + conn.close() diff --git a/openkiln/skills/cleanco/schema/001_initial.sql b/openkiln/skills/cleanco/schema/001_initial.sql new file mode 100644 index 0000000..e817f91 --- /dev/null +++ b/openkiln/skills/cleanco/schema/001_initial.sql @@ -0,0 +1,21 @@ +-- cleanco.db +-- OpenKiln cleanco skill schema v1 +-- +-- Caches cleaned company names to avoid re-calling the API +-- for names already processed. + +-- ============================================================ +-- CLEANED NAMES +-- One row per unique original company name. +-- Cache lookup: SELECT cleaned FROM cleaned_names WHERE original = ? +-- ============================================================ + +CREATE TABLE IF NOT EXISTS cleaned_names ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + original TEXT NOT NULL UNIQUE, + cleaned TEXT NOT NULL, + cleaned_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX IF NOT EXISTS idx_cleaned_names_original + ON cleaned_names (original); diff --git a/openkiln/skills/cleanco/skill.toml b/openkiln/skills/cleanco/skill.toml new file mode 100644 index 0000000..2e14748 --- /dev/null +++ b/openkiln/skills/cleanco/skill.toml @@ -0,0 +1,21 @@ +[skill] +name = "cleanco" +version = "0.1.0" +description = "Clean company names for outreach using OpenAI." +author = "OrbiSearch" + +[[skill.provides]] +type = "transform" +name = "cleanco.company_name" +description = "Clean company names for use in cold email" +module = "openkiln.skills.cleanco.workflow" +class = "CleanCompanyName" + +[skill.requires] +skills = [] + +[[skill.config]] +key = "OPENAI_API_KEY" +env_var = "OPENAI_API_KEY" +config_path = "skills.cleanco.api_key" +required = true diff --git a/openkiln/skills/cleanco/workflow.py b/openkiln/skills/cleanco/workflow.py new file mode 100644 index 0000000..d9565ae --- /dev/null +++ b/openkiln/skills/cleanco/workflow.py @@ -0,0 +1,87 @@ +"""Cleanco workflow transform.""" + +from __future__ import annotations + +from openkiln.core import Transform +from openkiln.skills.cleanco import queries +from openkiln.skills.cleanco.api import get_client + +BATCH_SIZE = 50 + + +class CleanCompanyName(Transform): + """Clean company names using OpenAI. Caches results in cleanco.db.""" + + def __init__(self) -> None: + self._client = get_client() + self._pending: list[dict] = [] + self._cache: dict[str, str] = {} + + def apply(self, row: dict) -> dict | None: + """Buffer rows and clean in batches.""" + name = row.get("company_name", "").strip() + if not name: + return row + + # Check cache first + if name in self._cache: + row["company_name"] = self._cache[name] + return row + + # Check DB cache + cached = queries.get_cached([name]) + if name in cached: + self._cache[name] = cached[name] + row["company_name"] = cached[name] + return row + + # Need to clean via API — do it immediately for workflow compatibility + # (Transform.apply processes one row at a time) + self._pending.append(row) + + if len(self._pending) >= BATCH_SIZE: + self._flush_batch() + + # Return row — company_name will be updated when batch flushes + # For single-row processing, flush immediately + if len(self._pending) < BATCH_SIZE: + self._flush_batch() + + return row + + def _flush_batch(self) -> None: + """Clean all pending names via API and update rows.""" + if not self._pending: + return + + names = [r.get("company_name", "") for r in self._pending] + unique_names = list(dict.fromkeys(names)) # dedupe, preserve order + + # Check DB cache for batch + cached = queries.get_cached(unique_names) + uncached = [n for n in unique_names if n not in cached] + + # Clean uncached names via API + new_mappings: dict[str, str] = {} + if uncached: + # Process in sub-batches of BATCH_SIZE + for i in range(0, len(uncached), BATCH_SIZE): + batch = uncached[i : i + BATCH_SIZE] + cleaned = self._client.clean_batch(batch) + for orig, clean in zip(batch, cleaned): + new_mappings[orig] = clean + + # Cache new results + queries.cache_results(new_mappings) + + # Merge all mappings + all_mappings = {**cached, **new_mappings} + self._cache.update(all_mappings) + + # Update pending rows + for row in self._pending: + name = row.get("company_name", "") + if name in all_mappings: + row["company_name"] = all_mappings[name] + + self._pending.clear()