From dff0e5826c9a9ea50ffd1ac1498bc565a6309313 Mon Sep 17 00:00:00 2001 From: Forge Date: Sun, 15 Feb 2026 15:40:59 +0100 Subject: [PATCH 1/3] Add parallel time-range sync script for Goldsky - Splits time range into N segments, syncs in parallel with ThreadPoolExecutor - Sticky cursor optimization: skips follow-up batches when boundary count < threshold - Graceful shutdown on SIGINT/SIGTERM, per-worker logging - Tested: 2 workers, 36k records in 28.6s (1273 rec/s) --- parallel_sync.py | 354 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 354 insertions(+) create mode 100644 parallel_sync.py diff --git a/parallel_sync.py b/parallel_sync.py new file mode 100644 index 0000000..a5cdb1d --- /dev/null +++ b/parallel_sync.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python3 +""" +Parallel time-range sync for Polymarket orderFilled events via Goldsky. + +Splits the time range (last_synced → now) into N segments, runs parallel +workers to sync each segment, then merges results into the main CSV. + +Usage: + python3 parallel_sync.py --workers 5 + python3 parallel_sync.py --workers 2 --end-ts 1767000000 # test with small range +""" + +import argparse +import json +import os +import signal +import subprocess +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone +from threading import Event + +import requests + +# ─── Config ─────────────────────────────────────────────────────────────────── + +QUERY_URL = "https://api.goldsky.com/api/public/project_cl6mb8i9h0003e201j6li0diw/subgraphs/orderbook-subgraph/0.0.1/gn" +BATCH_SIZE = 1000 +STICKY_THRESHOLD = 100 # skip sticky follow-up if < this many records expected +COLUMNS = ['timestamp', 'maker', 'makerAssetId', 'makerAmountFilled', + 'taker', 'takerAssetId', 'takerAmountFilled', 'transactionHash'] + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +MAIN_CSV = os.path.join(BASE_DIR, 'goldsky', 'orderFilled.csv') +CURSOR_FILE = os.path.join(BASE_DIR, 'goldsky', 'cursor_state.json') +TEMP_DIR = os.path.join(BASE_DIR, 'goldsky', 'parallel_segments') +LOG_DIR = os.path.join(BASE_DIR, 'parallel_logs') + +# Global shutdown event +shutdown_event = Event() + + +# ─── Helpers ────────────────────────────────────────────────────────────────── + +def ts_to_str(ts): + return datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') + + +def get_last_timestamp(): + """Read last synced timestamp from cursor file or CSV tail.""" + if os.path.isfile(CURSOR_FILE): + try: + with open(CURSOR_FILE) as f: + state = json.load(f) + ts = state.get('last_timestamp', 0) + if ts > 0: + return ts + except Exception: + pass + + if os.path.isfile(MAIN_CSV): + try: + result = subprocess.run(['tail', '-1', MAIN_CSV], + capture_output=True, text=True, timeout=10) + line = result.stdout.strip() + if line and not line.startswith('timestamp'): + return int(line.split(',')[0]) + except Exception: + pass + return 0 + + +def goldsky_query(session, where_clause, at_once=BATCH_SIZE): + """Execute a single Goldsky GraphQL query. Returns list of events.""" + query = f'''{{ + orderFilledEvents( + orderBy: timestamp, orderDirection: asc, + first: {at_once}, + where: {{{where_clause}}} + ) {{ + id timestamp maker makerAmountFilled makerAssetId + taker takerAmountFilled takerAssetId transactionHash + }} + }}''' + + for attempt in range(5): + if shutdown_event.is_set(): + return [] + try: + resp = session.post(QUERY_URL, json={'query': query}, timeout=30) + resp.raise_for_status() + data = resp.json() + if 'errors' in data: + raise RuntimeError(data['errors']) + return data.get('data', {}).get('orderFilledEvents', []) + except Exception as e: + if shutdown_event.is_set(): + return [] + wait = min(2 ** attempt, 30) + print(f" [retry {attempt+1}] {e} — waiting {wait}s") + time.sleep(wait) + return [] + + +# ─── Worker ─────────────────────────────────────────────────────────────────── + +def sync_segment(worker_id, start_ts, end_ts): + """ + Sync all orderFilled events in (start_ts, end_ts] to a temp CSV. + Returns (worker_id, record_count, output_path). + """ + os.makedirs(TEMP_DIR, exist_ok=True) + os.makedirs(LOG_DIR, exist_ok=True) + + out_path = os.path.join(TEMP_DIR, f'segment_{worker_id}.csv') + log_path = os.path.join(LOG_DIR, f'worker_{worker_id}.log') + + log = open(log_path, 'w') + csv_f = open(out_path, 'w') + session = requests.Session() + + def logmsg(msg): + line = f"[W{worker_id}] {msg}" + log.write(line + '\n') + log.flush() + print(line) + + logmsg(f"Range: {ts_to_str(start_ts)} → {ts_to_str(end_ts)}") + + last_ts = start_ts + last_id = None + sticky_ts = None + total = 0 + batches = 0 + + try: + while not shutdown_event.is_set(): + # Build where clause + if sticky_ts is not None: + where = f'timestamp: "{sticky_ts}", id_gt: "{last_id}"' + else: + where = f'timestamp_gt: "{last_ts}", timestamp_lte: "{end_ts}"' + + events = goldsky_query(session, where) + if not events: + if sticky_ts is not None: + # Done with sticky, advance + last_ts = sticky_ts + sticky_ts = None + last_id = None + continue + break + + events.sort(key=lambda e: (int(e['timestamp']), e['id'])) + n = len(events) + batch_last_ts = int(events[-1]['timestamp']) + batch_first_ts = int(events[0]['timestamp']) + batches += 1 + + # Write to CSV + seen = set() + for ev in events: + if ev['id'] in seen: + continue + seen.add(ev['id']) + csv_f.write(','.join(str(ev.get(c, '')) for c in COLUMNS) + '\n') + csv_f.flush() + total += len(seen) + + # Cursor logic with sticky optimization + if n >= BATCH_SIZE: + if batch_first_ts == batch_last_ts: + # All same timestamp — must paginate within it + sticky_ts = batch_last_ts + last_id = events[-1]['id'] + logmsg(f"Batch {batches}: ts={batch_last_ts} n={n} [STICKY-SAME]") + else: + # Full batch, mixed timestamps. Check if sticky follow-up is worth it. + # Instead of going sticky on every full batch, just advance past the + # second-to-last timestamp to avoid tiny follow-ups. + # Find the last "complete" timestamp (one before the boundary) + boundary_ts = batch_last_ts + safe_ts = batch_first_ts + for ev in events: + t = int(ev['timestamp']) + if t < boundary_ts: + safe_ts = t + + # Count how many events are at the boundary timestamp + boundary_count = sum(1 for ev in events if int(ev['timestamp']) == boundary_ts) + + if boundary_count >= STICKY_THRESHOLD: + # Many events at boundary — go sticky to get them all + sticky_ts = boundary_ts + last_id = events[-1]['id'] + logmsg(f"Batch {batches}: ts={batch_first_ts}-{batch_last_ts} n={n} [STICKY bc={boundary_count}]") + else: + # Few events at boundary — just advance past the safe timestamp + # We already have all events up to boundary_ts from this batch + last_ts = safe_ts + sticky_ts = None + last_id = None + logmsg(f"Batch {batches}: ts={batch_first_ts}-{batch_last_ts} n={n} [SKIP-STICKY bc={boundary_count}]") + else: + # Not full — all events retrieved + if sticky_ts is not None: + last_ts = sticky_ts + sticky_ts = None + last_id = None + logmsg(f"Batch {batches}: ts={batch_last_ts} n={n} [STICKY-DONE]") + else: + last_ts = batch_last_ts + logmsg(f"Batch {batches}: ts={batch_last_ts} n={n}") + + if n < BATCH_SIZE and sticky_ts is None: + # Might be more data — only break if we've reached end_ts + if batch_last_ts >= end_ts: + break + # Otherwise keep going (sparse data region) + + except Exception as e: + logmsg(f"ERROR: {e}") + + logmsg(f"Done: {total} records in {batches} batches") + csv_f.close() + log.close() + session.close() + + return worker_id, total, out_path + + +# ─── Merge ──────────────────────────────────────────────────────────────────── + +def merge_segments(segment_files, record_counts): + """Merge temp segment CSVs into the main orderFilled.csv in order.""" + # Segments are already in timestamp order (worker 0 = earliest range) + # Just append them in order + print(f"\nMerging {len(segment_files)} segments into {MAIN_CSV}...") + + total_new = 0 + with open(MAIN_CSV, 'a') as out: + for wid in sorted(segment_files.keys()): + path = segment_files[wid] + if not os.path.isfile(path): + continue + count = 0 + with open(path) as f: + for line in f: + line = line.strip() + if line: + out.write(line + '\n') + count += 1 + total_new += count + print(f" Segment {wid}: {count} records appended") + + # Update cursor state + if total_new > 0: + # Get last timestamp from merged data + result = subprocess.run(['tail', '-1', MAIN_CSV], + capture_output=True, text=True, timeout=10) + last_line = result.stdout.strip() + if last_line: + last_ts = int(last_line.split(',')[0]) + with open(CURSOR_FILE, 'w') as f: + json.dump({'last_timestamp': last_ts, 'last_id': None, + 'sticky_timestamp': None}, f) + print(f" Cursor updated to {last_ts} ({ts_to_str(last_ts)})") + + print(f" Total new records: {total_new}") + + # Cleanup temp files + for path in segment_files.values(): + if os.path.isfile(path): + os.remove(path) + if os.path.isdir(TEMP_DIR) and not os.listdir(TEMP_DIR): + os.rmdir(TEMP_DIR) + + +# ─── Main ───────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description='Parallel Goldsky sync') + parser.add_argument('--workers', type=int, default=5, help='Number of parallel workers') + parser.add_argument('--end-ts', type=int, default=None, help='End timestamp (default: now)') + args = parser.parse_args() + + # Signal handling + def handle_signal(sig, frame): + print("\n⚠ Shutdown requested — finishing current batches...") + shutdown_event.set() + signal.signal(signal.SIGINT, handle_signal) + signal.signal(signal.SIGTERM, handle_signal) + + # Determine time range + start_ts = get_last_timestamp() + end_ts = args.end_ts or int(time.time()) + + if start_ts >= end_ts: + print("Already up to date!") + return + + gap = end_ts - start_ts + print(f"{'='*60}") + print(f"Parallel Goldsky Sync") + print(f" Range: {ts_to_str(start_ts)} → {ts_to_str(end_ts)}") + print(f" Gap: {gap/86400:.1f} days") + print(f" Workers: {args.workers}") + print(f"{'='*60}\n") + + # Split into segments + segment_size = gap // args.workers + segments = [] + for i in range(args.workers): + seg_start = start_ts + i * segment_size + seg_end = start_ts + (i + 1) * segment_size if i < args.workers - 1 else end_ts + segments.append((i, seg_start, seg_end)) + print(f" Worker {i}: {ts_to_str(seg_start)} → {ts_to_str(seg_end)} ({(seg_end-seg_start)/86400:.1f}d)") + + print() + t0 = time.time() + + # Run workers in parallel + segment_files = {} + record_counts = {} + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + futures = {pool.submit(sync_segment, wid, s, e): wid + for wid, s, e in segments} + + for future in as_completed(futures): + wid = futures[future] + try: + wid, count, path = future.result() + segment_files[wid] = path + record_counts[wid] = count + except Exception as e: + print(f"Worker {wid} failed: {e}") + + elapsed = time.time() - t0 + total = sum(record_counts.values()) + print(f"\nAll workers done in {elapsed:.1f}s — {total} records total") + print(f"Throughput: {total/max(elapsed,1):.0f} records/sec") + + if not shutdown_event.is_set() and total > 0: + merge_segments(segment_files, record_counts) + elif shutdown_event.is_set(): + print("Shutdown — skipping merge. Temp files preserved in", TEMP_DIR) + + print("Done!") + + +if __name__ == '__main__': + main() From d26b40217f9583711836b861bd5d9e97fe6e0641 Mon Sep 17 00:00:00 2001 From: Forge Date: Sun, 15 Feb 2026 18:32:47 +0100 Subject: [PATCH 2/3] feat: add signal bot for whale/volume spike detection - signals/scan.py: tails orderFilled.csv (no pandas), detects signals - signals/detect.py: whale orders (>0k) and volume spikes (3x+ normal) - signals/alert.py: logs to alerts.log, Telegram support ready - signals/config.json: configurable thresholds - BTC market tagging via btc_price_markets.csv - Min k volume filter to reduce noise --- signals/alert.py | 43 ++++++++++++ signals/config.json | 10 +++ signals/detect.py | 91 +++++++++++++++++++++++++ signals/scan.py | 157 ++++++++++++++++++++++++++++++++++++++++++++ signals/state.json | 3 + 5 files changed, 304 insertions(+) create mode 100644 signals/alert.py create mode 100644 signals/config.json create mode 100644 signals/detect.py create mode 100644 signals/scan.py create mode 100644 signals/state.json diff --git a/signals/alert.py b/signals/alert.py new file mode 100644 index 0000000..331a166 --- /dev/null +++ b/signals/alert.py @@ -0,0 +1,43 @@ +"""Alert module — writes signals to alerts.log and stdout. Telegram TBD.""" + +import json +import os +import sys +from datetime import datetime, timezone + +SIGNALS_DIR = os.path.dirname(os.path.abspath(__file__)) +LOG_FILE = os.path.join(SIGNALS_DIR, "alerts.log") +CONFIG_FILE = os.path.join(SIGNALS_DIR, "config.json") + + +def _load_config(): + with open(CONFIG_FILE) as f: + return json.load(f) + + +def send_alert(message: str): + """Write alert to log file and stdout.""" + ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + line = f"[{ts}] {message}" + print(line) + with open(LOG_FILE, "a") as f: + f.write(line + "\n") + + # Telegram (future) + cfg = _load_config() + token = cfg.get("telegram_bot_token", "") + chat_id = cfg.get("telegram_chat_id", "") + if token and chat_id: + _send_telegram(token, chat_id, message) + + +def _send_telegram(token: str, chat_id: str, text: str): + """Send via Telegram Bot API.""" + import urllib.request + url = f"https://api.telegram.org/bot{token}/sendMessage" + data = json.dumps({"chat_id": chat_id, "text": text, "parse_mode": "HTML"}).encode() + req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) + try: + urllib.request.urlopen(req, timeout=10) + except Exception as e: + print(f"Telegram send failed: {e}", file=sys.stderr) diff --git a/signals/config.json b/signals/config.json new file mode 100644 index 0000000..8502461 --- /dev/null +++ b/signals/config.json @@ -0,0 +1,10 @@ +{ + "whale_threshold_usd": 50000, + "volume_spike_multiplier": 3.0, + "lookback_minutes": 60, + "scan_window_minutes": 15, + "btc_markets_file": "btc_price_markets.csv", + "min_spike_volume_usd": 1000, + "telegram_bot_token": "", + "telegram_chat_id": "" +} diff --git a/signals/detect.py b/signals/detect.py new file mode 100644 index 0000000..697a1ea --- /dev/null +++ b/signals/detect.py @@ -0,0 +1,91 @@ +"""Signal detection logic. Operates on pre-extracted rows (list of dicts).""" + +import csv +import io +import os +import time +from collections import defaultdict + + +def detect_whales(rows, threshold_usd=50000, btc_token_ids=None): + """Find single orders above threshold. Returns list of signal dicts.""" + signals = [] + # takerAmountFilled when takerAssetId == "0" means taker paid USDC + # makerAmountFilled when makerAssetId == "0" means maker paid USDC + for row in rows: + usd = 0 + if row["makerAssetId"] == "0": + usd = int(row["makerAmountFilled"]) / 1e6 + elif row["takerAssetId"] == "0": + usd = int(row["takerAmountFilled"]) / 1e6 + + if usd >= threshold_usd: + # Determine which token is the outcome token + token_id = row["makerAssetId"] if row["makerAssetId"] != "0" else row["takerAssetId"] + is_btc = btc_token_ids and token_id in btc_token_ids + signals.append({ + "type": "whale", + "usd": usd, + "token_id": token_id, + "timestamp": int(row["timestamp"]), + "tx": row["transactionHash"], + "is_btc": is_btc, + }) + return signals + + +def detect_volume_spikes(rows, window_minutes=15, lookback_minutes=60, + multiplier=3.0, btc_token_ids=None, + min_volume_usd=1000): + """Detect markets where recent volume >> historical average.""" + now = max(int(r["timestamp"]) for r in rows) if rows else int(time.time()) + window_start = now - window_minutes * 60 + lookback_start = now - lookback_minutes * 60 + + # Aggregate volume per token + window_vol = defaultdict(float) + lookback_vol = defaultdict(float) + + for row in rows: + ts = int(row["timestamp"]) + if ts < lookback_start: + continue + # USD side + usd = 0 + if row["makerAssetId"] == "0": + usd = int(row["makerAmountFilled"]) / 1e6 + elif row["takerAssetId"] == "0": + usd = int(row["takerAmountFilled"]) / 1e6 + if usd == 0: + continue + + token_id = row["makerAssetId"] if row["makerAssetId"] != "0" else row["takerAssetId"] + + if ts >= window_start: + window_vol[token_id] += usd + lookback_vol[token_id] += usd + + signals = [] + lookback_duration = lookback_minutes + window_duration = window_minutes + + for token_id, wvol in window_vol.items(): + if wvol < min_volume_usd: + continue + lvol = lookback_vol.get(token_id, 0) + if lvol == 0: + continue + # Normalize to per-minute rate + lookback_rate = lvol / lookback_duration + window_rate = wvol / window_duration + if lookback_rate > 0 and window_rate >= multiplier * lookback_rate: + ratio = window_rate / lookback_rate + is_btc = btc_token_ids and token_id in btc_token_ids + signals.append({ + "type": "volume_spike", + "token_id": token_id, + "window_volume": wvol, + "ratio": round(ratio, 1), + "is_btc": is_btc, + }) + return signals diff --git a/signals/scan.py b/signals/scan.py new file mode 100644 index 0000000..3140adc --- /dev/null +++ b/signals/scan.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +"""Main scanner — tails orderFilled.csv, detects signals, sends alerts.""" + +import csv +import io +import json +import os +import subprocess +import sys +import time + +SIGNALS_DIR = os.path.dirname(os.path.abspath(__file__)) +PROJECT_DIR = os.path.dirname(SIGNALS_DIR) +CONFIG_FILE = os.path.join(SIGNALS_DIR, "config.json") +STATE_FILE = os.path.join(SIGNALS_DIR, "state.json") +ORDER_CSV = os.path.join(PROJECT_DIR, "goldsky", "orderFilled.csv") +MARKETS_CSV = os.path.join(PROJECT_DIR, "markets.csv") + + +def load_config(): + with open(CONFIG_FILE) as f: + return json.load(f) + + +def load_state(): + if os.path.exists(STATE_FILE): + with open(STATE_FILE) as f: + return json.load(f) + return {"last_scanned_timestamp": 0} + + +def save_state(state): + with open(STATE_FILE, "w") as f: + json.dump(state, f, indent=2) + + +def load_btc_token_ids(cfg): + """Load BTC market token IDs from btc_price_markets.csv.""" + path = os.path.join(PROJECT_DIR, cfg.get("btc_markets_file", "btc_price_markets.csv")) + token_ids = set() + if not os.path.exists(path): + print(f"Warning: BTC markets file not found: {path}", file=sys.stderr) + return token_ids + with open(path) as f: + reader = csv.DictReader(f) + for row in reader: + for col in ("token1", "token2"): + if col in row and row[col]: + token_ids.add(row[col]) + return token_ids + + +def load_market_names(): + """Load token_id -> market question mapping from markets.csv.""" + token_to_name = {} + if not os.path.exists(MARKETS_CSV): + return token_to_name + with open(MARKETS_CSV) as f: + reader = csv.DictReader(f) + for row in reader: + q = row.get("question", row.get("market_slug", "unknown")) + for col in ("token1", "token2"): + if col in row and row[col]: + token_to_name[row[col]] = q + return token_to_name + + +def tail_recent_rows(n=100000): + """Use tail to get last N lines from the big CSV, return parsed rows.""" + result = subprocess.run( + ["tail", "-n", str(n), ORDER_CSV], + capture_output=True, text=True, timeout=60 + ) + if result.returncode != 0: + print(f"tail failed: {result.stderr}", file=sys.stderr) + return [] + + header = "timestamp,maker,makerAssetId,makerAmountFilled,taker,takerAssetId,takerAmountFilled,transactionHash" + text = header + "\n" + result.stdout + reader = csv.DictReader(io.StringIO(text)) + return list(reader) + + +def main(): + from detect import detect_whales, detect_volume_spikes + from alert import send_alert + + cfg = load_config() + state = load_state() + last_ts = state.get("last_scanned_timestamp", 0) + + print(f"[scan] Loading recent rows via tail...") + rows = tail_recent_rows() + if not rows: + print("[scan] No rows found.") + return + + # Filter to only new rows + new_rows = [r for r in rows if int(r["timestamp"]) > last_ts] + print(f"[scan] Total tailed: {len(rows)}, new since last scan: {len(new_rows)}") + + if not new_rows: + print("[scan] No new data to scan.") + # Still update timestamp to latest + max_ts = max(int(r["timestamp"]) for r in rows) + state["last_scanned_timestamp"] = max_ts + save_state(state) + return + + btc_tokens = load_btc_token_ids(cfg) + print(f"[scan] Loaded {len(btc_tokens)} BTC token IDs") + + # Use ALL tailed rows for volume context but only new rows for whales + whale_signals = detect_whales( + new_rows, + threshold_usd=cfg["whale_threshold_usd"], + btc_token_ids=btc_tokens, + ) + + volume_signals = detect_volume_spikes( + rows, # full context for rate comparison + window_minutes=cfg["scan_window_minutes"], + lookback_minutes=cfg["lookback_minutes"], + multiplier=cfg["volume_spike_multiplier"], + btc_token_ids=btc_tokens, + min_volume_usd=cfg.get("min_spike_volume_usd", 1000), + ) + + # Load market names for nice formatting + market_names = load_market_names() + + total = 0 + for s in whale_signals: + name = market_names.get(s["token_id"], s["token_id"][:20] + "...") + btc_tag = " 🟠 BTC" if s.get("is_btc") else "" + ts_str = time.strftime("%Y-%m-%d %H:%M", time.gmtime(s["timestamp"])) + msg = f"🐋 Whale alert: ${s['usd']:,.0f} order on [{name}] at {ts_str}{btc_tag}" + send_alert(msg) + total += 1 + + for s in volume_signals: + name = market_names.get(s["token_id"], s["token_id"][:20] + "...") + btc_tag = " 🟠 BTC" if s.get("is_btc") else "" + msg = f"📈 Volume spike: [{name}] {s['ratio']}x normal volume (${s['window_volume']:,.0f} in window){btc_tag}" + send_alert(msg) + total += 1 + + # Update state + max_ts = max(int(r["timestamp"]) for r in new_rows) + state["last_scanned_timestamp"] = max_ts + save_state(state) + + print(f"\n[scan] Done. {total} signals detected. State updated to ts={max_ts}") + + +if __name__ == "__main__": + main() diff --git a/signals/state.json b/signals/state.json new file mode 100644 index 0000000..8b96260 --- /dev/null +++ b/signals/state.json @@ -0,0 +1,3 @@ +{ + "last_scanned_timestamp": 1766922901 +} \ No newline at end of file From ae56a622e03e02c13dd04fc6d4e2097b5f437454 Mon Sep 17 00:00:00 2001 From: Forge Date: Mon, 16 Feb 2026 09:44:59 +0100 Subject: [PATCH 3/3] feat: add polymarket data filter CLI tool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Streams 84GB+ orderFilled.csv via awk — never loads into memory. Supports: --search, --from/--to, --min-usd, --token, --summary, --top-markets. Token ID matching uses awk associative arrays loaded from temp file. --- filter.py | 271 ++++++++++++++++++++++++++++++++++++++++++++++ filtered/.gitkeep | 0 2 files changed, 271 insertions(+) create mode 100755 filter.py create mode 100644 filtered/.gitkeep diff --git a/filter.py b/filter.py new file mode 100755 index 0000000..737051e --- /dev/null +++ b/filter.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 +"""Polymarket orderFilled data filter — streams 78GB+ CSV via awk, never loads it into memory.""" + +import argparse +import csv +import io +import os +import subprocess +import sys +import tempfile +import time +from collections import defaultdict +from datetime import datetime, timezone + +DATA_DIR = os.path.dirname(os.path.abspath(__file__)) +ORDER_FILE = os.path.join(DATA_DIR, "goldsky", "orderFilled.csv") +MARKETS_FILE = os.path.join(DATA_DIR, "markets.csv") +FILTERED_DIR = os.path.join(DATA_DIR, "filtered") + +# orderFilled columns: timestamp,maker,makerAssetId,makerAmountFilled,taker,takerAssetId,takerAmountFilled,transactionHash +# Index: 0 1 2 3 4 5 6 7 + + +def load_markets(search=None): + """Load markets.csv into memory (~50MB). Returns {token_id: market_info}.""" + token_map = {} # token_id -> {question, market_slug, ...} + with open(MARKETS_FILE, "r") as f: + reader = csv.DictReader(f) + for row in reader: + q = row.get("question", "") + if search and search.lower() not in q.lower(): + continue + info = {"question": q, "slug": row.get("market_slug", ""), "volume": float(row.get("volume", 0) or 0)} + for col in ("token1", "token2"): + tid = row.get(col, "") + if tid: + token_map[tid] = info + return token_map + + +def build_awk_filter(args, token_ids=None): + """Build an awk command list to filter orderFilled.csv. Returns (cmd_list, needs_python_token_filter, temp_file).""" + conditions = [] + needs_python_filter = False + tmp_file = None + + # Date filters — timestamp is column $1 + if args.date_from: + ts = int(datetime.strptime(args.date_from, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()) + conditions.append(f"$1 >= {ts}") + if args.date_to: + ts = int(datetime.strptime(args.date_to, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()) + conditions.append(f"$1 < {ts}") + + # Min USD filter + if args.min_usd: + threshold = int(args.min_usd * 1e6) + conditions.append(f'($3=="0" && $4 >= {threshold}) || ($6=="0" && $7 >= {threshold})') + + # Token filter (specific token ID) + if args.token: + conditions.append(f'$3=="{args.token}" || $6=="{args.token}"') + + # For --search: write token IDs to temp file, load in awk + token_cond = "" + if token_ids is not None: + tmp_file = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) + for tid in token_ids: + tmp_file.write(tid + "\n") + tmp_file.close() + token_cond = f'(($3 in t) || ($6 in t))' + + cond_parts = [] + if token_cond: + cond_parts.append(token_cond) + cond_parts.extend(f"({c})" for c in conditions) + cond_str = " && ".join(cond_parts) if cond_parts else "1" + + if tmp_file: + prog = f'BEGIN{{FS=","}} FILENAME==ARGV[1]{{t[$1]=1;next}} NR==1{{next}} {cond_str} {{print}}' + cmd = ["awk", prog, tmp_file.name, ORDER_FILE] + else: + prog = f'BEGIN{{FS=","}} NR==1{{next}} {cond_str} {{print}}' + cmd = ["awk", prog, ORDER_FILE] + + return cmd, needs_python_filter, tmp_file + + +def get_usd_amount(row_parts): + """Extract USD amount from a parsed row. Returns float USD.""" + maker_asset = row_parts[2] + taker_asset = row_parts[5] + if maker_asset == "0": + return int(row_parts[3]) / 1e6 + elif taker_asset == "0": + return int(row_parts[6]) / 1e6 + return 0.0 + + +def get_token_id(row_parts): + """Get the non-zero token ID (the outcome token, not USDC).""" + if row_parts[2] != "0": + return row_parts[2] + if row_parts[5] != "0": + return row_parts[5] + return "" + + +def run_filter(args): + token_map = None + token_ids = None + + # Load markets if needed + if args.search or args.summary or args.top_markets: + print(f"Loading markets.csv...", file=sys.stderr) + if args.search: + token_map = load_markets(search=args.search) + token_ids = set(token_map.keys()) + print(f" Found {len(token_ids)} token IDs matching '{args.search}' ({len(set(m['question'] for m in token_map.values()))} markets)", file=sys.stderr) + if not token_ids: + print("No markets match that search.", file=sys.stderr) + return + else: + token_map = load_markets() + token_ids = None + + awk_cmd, needs_python_filter, tmp_file = build_awk_filter(args, token_ids) + + print(f"Scanning {ORDER_FILE}...", file=sys.stderr) + proc = subprocess.Popen( + awk_cmd, + stdout=subprocess.PIPE, + text=True, + bufsize=1024 * 1024, + ) + + header = "timestamp,maker,makerAssetId,makerAmountFilled,taker,takerAssetId,takerAmountFilled,transactionHash" + + # Prepare output + out_file = None + if args.output and not args.summary and not args.top_markets: + out_path = args.output + if not os.path.isabs(out_path): + out_path = os.path.join(FILTERED_DIR, out_path) + os.makedirs(os.path.dirname(out_path), exist_ok=True) + out_file = open(out_path, "w") + out_file.write(header + "\n") + + # Stats + count = 0 + total_usd = 0.0 + market_volumes = defaultdict(float) # question -> usd + min_ts = float("inf") + max_ts = 0 + start_time = time.time() + + try: + for line in proc.stdout: + line = line.rstrip("\n") + parts = line.split(",", 7) + if len(parts) < 8: + continue + + # Python-side token filter for large token sets + if needs_python_filter and token_ids: + if parts[2] not in token_ids and parts[5] not in token_ids: + continue + + count += 1 + ts = int(parts[0]) + usd = get_usd_amount(parts) + total_usd += usd + + if ts < min_ts: + min_ts = ts + if ts > max_ts: + max_ts = ts + + if args.summary or args.top_markets: + tid = get_token_id(parts) + if token_map and tid in token_map: + market_volumes[token_map[tid]["question"]] += usd + else: + market_volumes[tid] += usd + + if out_file: + out_file.write(line + "\n") + + if count % 1_000_000 == 0: + elapsed = time.time() - start_time + print(f" {count:,} rows processed ({elapsed:.0f}s)...", file=sys.stderr) + + except KeyboardInterrupt: + print("\nInterrupted.", file=sys.stderr) + finally: + proc.stdout.close() + proc.wait() + if out_file: + out_file.close() + if tmp_file: + os.unlink(tmp_file.name) + + elapsed = time.time() - start_time + + # Print results + if args.summary or args.top_markets: + print(f"\n{'='*60}", file=sys.stderr) + print(f"Results:", file=sys.stderr) + print(f" Rows: {count:,}", file=sys.stderr) + print(f" USD Volume: ${total_usd:,.2f}", file=sys.stderr) + if count > 0: + print(f" Date Range: {datetime.fromtimestamp(min_ts, tz=timezone.utc).strftime('%Y-%m-%d')} → {datetime.fromtimestamp(max_ts, tz=timezone.utc).strftime('%Y-%m-%d')}", file=sys.stderr) + print(f" Unique Markets: {len(market_volumes):,}", file=sys.stderr) + + n = args.top_markets or 10 + top = sorted(market_volumes.items(), key=lambda x: -x[1])[:n] + if top: + print(f"\n Top {len(top)} Markets by Volume:", file=sys.stderr) + for i, (q, vol) in enumerate(top, 1): + label = q if len(q) < 70 else q[:67] + "..." + print(f" {i:>3}. ${vol:>14,.2f} {label}", file=sys.stderr) + print(f"\n Time: {elapsed:.1f}s", file=sys.stderr) + + # If --top-markets with -o, write CSV + if args.top_markets and args.output: + out_path = args.output + if not os.path.isabs(out_path): + out_path = os.path.join(FILTERED_DIR, out_path) + os.makedirs(os.path.dirname(out_path), exist_ok=True) + with open(out_path, "w") as f: + f.write("rank,volume_usd,market\n") + for i, (q, vol) in enumerate(top, 1): + q_escaped = q.replace('"', '""') + f.write(f'{i},{vol:.2f},"{q_escaped}"\n') + print(f" Written to {out_path}", file=sys.stderr) + else: + print(f"\nDone: {count:,} rows, ${total_usd:,.2f} USD volume, {elapsed:.1f}s", file=sys.stderr) + if out_file: + out_path = args.output + if not os.path.isabs(out_path): + out_path = os.path.join(FILTERED_DIR, out_path) + size = os.path.getsize(out_path) + unit = "KB" if size < 1e6 else "MB" if size < 1e9 else "GB" + divisor = 1e3 if size < 1e6 else 1e6 if size < 1e9 else 1e9 + print(f" Output: {out_path} ({size/divisor:.1f} {unit})", file=sys.stderr) + + +def main(): + parser = argparse.ArgumentParser(description="Filter Polymarket orderFilled data (78GB+)") + parser.add_argument("--search", "-s", help="Filter by market keyword (searches question text)") + parser.add_argument("--from", dest="date_from", help="Start date (YYYY-MM-DD, inclusive)") + parser.add_argument("--to", dest="date_to", help="End date (YYYY-MM-DD, exclusive)") + parser.add_argument("--min-usd", type=float, help="Minimum USD value per order") + parser.add_argument("--token", help="Filter by specific token/asset ID") + parser.add_argument("--output", "-o", help="Output CSV filename (saved to filtered/ dir)") + parser.add_argument("--summary", action="store_true", help="Show stats only, don't output rows") + parser.add_argument("--top-markets", type=int, metavar="N", help="Show top N markets by volume") + args = parser.parse_args() + + if not args.summary and not args.top_markets and not args.output: + parser.error("Specify -o OUTPUT, --summary, or --top-markets") + + if not os.path.exists(ORDER_FILE): + print(f"Error: {ORDER_FILE} not found", file=sys.stderr) + sys.exit(1) + + run_filter(args) + + +if __name__ == "__main__": + main() diff --git a/filtered/.gitkeep b/filtered/.gitkeep new file mode 100644 index 0000000..e69de29