From 883f1f80314bfb69a7f59938cdebd9e6aa3e9c3d Mon Sep 17 00:00:00 2001 From: ONE Date: Sun, 8 Mar 2026 10:00:09 +0800 Subject: [PATCH] feat(agents): add Dubai villa lead scraper + Apify bridge + Sheets sync - agents/dubai_villa_scraper.py: stdlib-only scraper for PropertyFinder + Bayut - Extracts UAE phone numbers, owner names, areas, prices - Deduplicates by phone number - Outputs JSON to data/state/villa_leads.json - agents/apify_dubai_scraper.py: Apify actor bridge (faster path) - Uses redoubtable_bubble/dubai-real-estate-scraper actor - Handles anti-bot automatically - Requires APIFY_TOKEN in .env - core/leads-bridge.js: syncs villa_leads.json to Google Sheets CRM - Deduplicates synced leads - Uses existing SheetsService pattern Use case: IXR interior design client acquisition from Dubai villa owners --- README.md | 24 ++- agents/apify_dubai_scraper.py | 216 ++++++++++++++++++++++ agents/dubai_villa_scraper.py | 335 ++++++++++++++++++++++++++++++++++ agents/requirements.txt | 4 +- core/leads-bridge.js | 64 +++++++ 5 files changed, 641 insertions(+), 2 deletions(-) create mode 100644 agents/apify_dubai_scraper.py create mode 100644 agents/dubai_villa_scraper.py create mode 100644 core/leads-bridge.js diff --git a/README.md b/README.md index 838461a..1c973ec 100644 --- a/README.md +++ b/README.md @@ -53,4 +53,26 @@ python agents/hello.py - `npm run dev` - runs UI + Core + Orchestrator - `npm run ui` - starts Next.js UI - `npm run core` - starts Express API -- `npm run agent` - runs hello agent directly \ No newline at end of file +- `npm run agent` - runs hello agent directly +## Dubai Villa Lead Scraper + +Two scraper agents for collecting direct villa owner contacts in Dubai: + +### Basic Scraper (no API key needed) +```bash +python agents/dubai_villa_scraper.py --source both --area "Palm Jumeirah" --max 30 +``` + +### Apify Scraper (faster, more reliable) +Requires `APIFY_TOKEN` in `.env` +```bash +python agents/apify_dubai_scraper.py --area "Emirates Hills" --max 100 +``` + +Leads are saved to `data/state/villa_leads.json` and can be synced to Google Sheets via `core/leads-bridge.js`. + +### Sync to Sheets +```js +const { syncLeadsToSheets } = require('./core/leads-bridge'); +await syncLeadsToSheets(); +``` diff --git a/agents/apify_dubai_scraper.py b/agents/apify_dubai_scraper.py new file mode 100644 index 0000000..6c2b944 --- /dev/null +++ b/agents/apify_dubai_scraper.py @@ -0,0 +1,216 @@ +""" +Apify Dubai Real Estate API Bridge +==================================== +Uses Apify's ready-made Dubai Real Estate Scraper actor to get +owner contacts from PropertyFinder, Bayut & Dubizzle. + +This is the FAST path - uses Apify's actor which handles anti-bot measures. +Requires APIFY_TOKEN in .env + +Usage: + python agents/apify_dubai_scraper.py + python agents/apify_dubai_scraper.py --area "Palm Jumeirah" --max 100 +""" + +import argparse +import json +import time +import urllib.request +import urllib.error +import os +from datetime import datetime, timezone +from pathlib import Path + +ROOT_DIR = Path(__file__).resolve().parent.parent +STATE_DIR = ROOT_DIR / "data" / "state" +LOG_DIR = ROOT_DIR / "data" / "logs" +LEADS_FILE = STATE_DIR / "villa_leads.json" +LOG_FILE = LOG_DIR / "apify_scraper.log" + +STATE_DIR.mkdir(parents=True, exist_ok=True) +LOG_DIR.mkdir(parents=True, exist_ok=True) + +# Apify actor ID for Dubai Real Estate Scraper +ACTOR_ID = "redoubtable_bubble~dubai-real-estate-scraper-propertyfinder-bayut-dubizzle" + + +def log(msg: str) -> None: + line = f"[{datetime.now(timezone.utc).isoformat()}] [apify-scraper] {msg}" + print(line, flush=True) + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(line + "\n") + + +def now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def apify_request(method: str, path: str, token: str, body: dict = None) -> dict: + url = f"https://api.apify.com/v2{path}?token={token}" + data = json.dumps(body).encode() if body else None + headers = {"Content-Type": "application/json"} + req = urllib.request.Request(url, data=data, headers=headers, method=method) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read()) + except urllib.error.HTTPError as e: + error_body = e.read().decode() + log(f"Apify API error {e.code}: {error_body}") + return {"error": str(e.code), "message": error_body} + except Exception as e: + log(f"Request error: {e}") + return {"error": str(e)} + + +def run_actor(token: str, area: str, max_items: int, property_type: str = "villa") -> str | None: + """Start the Apify actor run and return run ID.""" + payload = { + "searchQuery": f"{property_type} {area} Dubai" if area else f"{property_type} Dubai", + "maxItems": max_items, + "propertyType": "villa", + "listingType": "rent", + "location": area or "Dubai", + "directOwnerOnly": True + } + log(f"Starting Apify actor: {ACTOR_ID}") + log(f"Payload: {json.dumps(payload)}") + + result = apify_request("POST", f"/acts/{ACTOR_ID}/runs", token, payload) + + if "data" in result: + run_id = result["data"]["id"] + log(f"Actor started. Run ID: {run_id}") + return run_id + else: + log(f"Failed to start actor: {result}") + return None + + +def wait_for_run(token: str, run_id: str, timeout: int = 300) -> bool: + """Wait for actor run to finish.""" + log(f"Waiting for run {run_id} to complete...") + start = time.time() + while time.time() - start < timeout: + result = apify_request("GET", f"/actor-runs/{run_id}", token) + status = result.get("data", {}).get("status", "") + log(f" Status: {status}") + if status in ("SUCCEEDED", "FINISHED"): + return True + if status in ("FAILED", "ABORTED", "TIMED-OUT"): + log(f"Run failed with status: {status}") + return False + time.sleep(10) + log("Timeout waiting for actor run") + return False + + +def fetch_results(token: str, run_id: str) -> list[dict]: + """Fetch results from completed actor run.""" + result = apify_request("GET", f"/actor-runs/{run_id}/dataset/items", token) + items = result.get("data", {}).get("items", []) + log(f"Fetched {len(items)} items from Apify") + return items + + +def normalize_lead(item: dict, area: str) -> dict | None: + """Convert Apify result to our lead format.""" + # Apify actor returns various fields - normalize them + phone = ( + item.get("phone") or + item.get("contactPhone") or + item.get("agentPhone") or + item.get("ownerPhone") or "" + ) + name = ( + item.get("agentName") or + item.get("ownerName") or + item.get("contactName") or + "Unknown" + ) + if not phone: + return None + + return { + "name": name.strip(), + "phone": phone.strip(), + "all_phones": [phone.strip()], + "area": item.get("location") or item.get("area") or area or "Dubai", + "type": "villa", + "price": str(item.get("price", "")), + "url": item.get("url") or item.get("propertyUrl", ""), + "source": item.get("source") or "Apify/Dubai", + "direct_owner": item.get("directOwner", False), + "unit_number": item.get("unitNumber", ""), + "scraped_at": now_iso() + } + + +def load_existing_leads() -> list: + if LEADS_FILE.exists(): + try: + return json.loads(LEADS_FILE.read_text(encoding="utf-8")) + except Exception: + return [] + return [] + + +def save_leads(leads: list) -> None: + LEADS_FILE.write_text(json.dumps(leads, indent=2, ensure_ascii=False), encoding="utf-8") + + +def deduplicate(existing: list, new_leads: list) -> tuple[list, int]: + existing_phones = {lead["phone"] for lead in existing} + unique_new = [] + for lead in new_leads: + if lead["phone"] not in existing_phones: + unique_new.append(lead) + existing_phones.add(lead["phone"]) + return unique_new, len(new_leads) - len(unique_new) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Apify Dubai Villa Scraper") + parser.add_argument("--area", type=str, default="", help="Area in Dubai (e.g. 'Palm Jumeirah')") + parser.add_argument("--max", type=int, default=50, help="Max leads to scrape") + parser.add_argument("--token", type=str, default=os.environ.get("APIFY_TOKEN", ""), help="Apify API token") + args = parser.parse_args() + + if not args.token: + log("ERROR: APIFY_TOKEN not set. Add it to .env or pass --token") + print('__RESULT__:{"status":"error","message":"APIFY_TOKEN not set"}') + return + + log("=== Apify Dubai Villa Scraper Started ===") + + run_id = run_actor(args.token, args.area, args.max) + if not run_id: + print('__RESULT__:{"status":"error","message":"Failed to start actor"}') + return + + success = wait_for_run(args.token, run_id) + if not success: + print('__RESULT__:{"status":"error","message":"Actor run failed"}') + return + + raw_items = fetch_results(args.token, run_id) + new_leads = [n for item in raw_items if (n := normalize_lead(item, args.area)) is not None] + + existing = load_existing_leads() + unique_leads, dupes = deduplicate(existing, new_leads) + all_leads = existing + unique_leads + save_leads(all_leads) + + log(f"=== Done. New: {len(unique_leads)}, Skipped: {dupes}, Total: {len(all_leads)} ===") + + summary = { + "status": "ok", + "new_leads": len(unique_leads), + "total_leads": len(all_leads), + "duplicates_skipped": dupes, + "leads": unique_leads + } + print(f"\n__RESULT__:{json.dumps(summary)}") + + +if __name__ == "__main__": + main() diff --git a/agents/dubai_villa_scraper.py b/agents/dubai_villa_scraper.py new file mode 100644 index 0000000..b1248d8 --- /dev/null +++ b/agents/dubai_villa_scraper.py @@ -0,0 +1,335 @@ +""" +Dubai Villa Lead Scraper Agent +================================ +Scrapes villa listings from PropertyFinder and Bayut for direct-owner contact details. +Outputs structured JSON for the orchestrator to pass to Google Sheets CRM. + +Usage: + python agents/dubai_villa_scraper.py + python agents/dubai_villa_scraper.py --source propertyfinder --max 50 + python agents/dubai_villa_scraper.py --source bayut --area "Palm Jumeirah" + +Output: + Writes to data/state/villa_leads.json + Each lead: { name, phone, area, type, price, url, source, scraped_at } +""" + +import argparse +import json +import time +import urllib.request +import urllib.parse +import urllib.error +import re +import os +from datetime import datetime, timezone +from pathlib import Path + +ROOT_DIR = Path(__file__).resolve().parent.parent +STATE_DIR = ROOT_DIR / "data" / "state" +LOG_DIR = ROOT_DIR / "data" / "logs" +LEADS_FILE = STATE_DIR / "villa_leads.json" +SCRAPER_LOG = LOG_DIR / "scraper.log" + +STATE_DIR.mkdir(parents=True, exist_ok=True) +LOG_DIR.mkdir(parents=True, exist_ok=True) + + +def log(msg: str) -> None: + line = f"[{datetime.now(timezone.utc).isoformat()}] [villa-scraper] {msg}" + print(line, flush=True) + with open(SCRAPER_LOG, "a", encoding="utf-8") as f: + f.write(line + "\n") + + +def now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def load_existing_leads() -> list: + if LEADS_FILE.exists(): + try: + return json.loads(LEADS_FILE.read_text(encoding="utf-8")) + except Exception: + return [] + return [] + + +def save_leads(leads: list) -> None: + LEADS_FILE.write_text(json.dumps(leads, indent=2, ensure_ascii=False), encoding="utf-8") + + +def fetch_url(url: str, headers: dict = None) -> str | None: + """Fetch a URL with retry logic.""" + default_headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + "Accept": "text/html,application/xhtml+xml,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.5", + } + if headers: + default_headers.update(headers) + + req = urllib.request.Request(url, headers=default_headers) + for attempt in range(3): + try: + with urllib.request.urlopen(req, timeout=15) as resp: + return resp.read().decode("utf-8", errors="replace") + except urllib.error.HTTPError as e: + log(f"HTTP {e.code} on attempt {attempt+1}: {url}") + if e.code in (403, 429): + time.sleep(5 * (attempt + 1)) + else: + break + except Exception as e: + log(f"Error on attempt {attempt+1}: {e}") + time.sleep(3) + return None + + +def extract_phones(text: str) -> list[str]: + """Extract UAE phone numbers from text.""" + patterns = [ + r'\+971[\s\-]?\d{2}[\s\-]?\d{3}[\s\-]?\d{4}', + r'00971[\s\-]?\d{2}[\s\-]?\d{3}[\s\-]?\d{4}', + r'05\d[\s\-]?\d{3}[\s\-]?\d{4}', + r'04[\s\-]?\d{3}[\s\-]?\d{4}', + ] + phones = [] + for pattern in patterns: + found = re.findall(pattern, text) + phones.extend(found) + # Normalize + normalized = [] + for p in phones: + clean = re.sub(r'[\s\-]', '', p) + if clean not in normalized: + normalized.append(clean) + return normalized + + +def scrape_propertyfinder(area: str = "", max_results: int = 30) -> list[dict]: + """Scrape PropertyFinder for direct-owner Dubai villa listings.""" + leads = [] + page = 1 + + area_slug = area.lower().replace(" ", "-") if area else "" + base_url = "https://www.propertyfinder.ae/en/search?c=2&t=1&fu=1&rp=y" + if area_slug: + base_url += f"&l={urllib.parse.quote(area)}" + + log(f"PropertyFinder: starting scrape (area={area or 'all Dubai'}, max={max_results})") + + while len(leads) < max_results: + url = f"{base_url}&page={page}" + html = fetch_url(url) + if not html: + break + + # Extract listing cards + # Look for direct owner markers + listings = re.findall( + r'data-id="(\d+)"[^>]*>.*?class="[^"]*property-card[^"]*".*?', + html, re.DOTALL + ) + + # Simpler extraction - look for villa data in JSON-LD or meta tags + # PropertyFinder embeds listing data as JSON + json_matches = re.findall(r'window\.__INITIAL_STATE__\s*=\s*({.*?});\s*', html, re.DOTALL) + if not json_matches: + json_matches = re.findall(r'"properties"\s*:\s*(\[.*?\])', html, re.DOTALL) + + # Extract listing URLs for further processing + listing_urls = re.findall( + r'href="(/en/[^"]*villa[^"]*)" class="[^"]*card[^"]*"', + html + ) + if not listing_urls: + listing_urls = re.findall( + r'"(/en/property/[^"]+)"', + html + ) + + if not listing_urls: + log(f"PropertyFinder page {page}: no listings found, stopping") + break + + log(f"PropertyFinder page {page}: found {len(listing_urls)} potential listings") + + for path in listing_urls[:10]: # Process up to 10 per page + if len(leads) >= max_results: + break + + listing_url = f"https://www.propertyfinder.ae{path}" + time.sleep(1.5) # Polite delay + + listing_html = fetch_url(listing_url) + if not listing_html: + continue + + # Extract contact info + phones = extract_phones(listing_html) + + # Extract name + name_match = re.search( + r'"agent[Nn]ame"\s*:\s*"([^"]+)"' + r'|]*class="[^"]*agent-name[^"]*"[^>]*>([^<]+)<', + listing_html + ) + name = "" + if name_match: + name = (name_match.group(1) or name_match.group(2) or "").strip() + + # Extract price + price_match = re.search(r'"price"\s*:\s*(\d+)', listing_html) + price = price_match.group(1) if price_match else "" + + # Check if direct owner (not agent) + is_direct = bool(re.search( + r'direct.*owner|owner.*direct|by.*owner|no.*commission', + listing_html, re.IGNORECASE + )) + + if phones: + lead = { + "name": name or "Unknown", + "phone": phones[0], + "all_phones": phones, + "area": area or "Dubai", + "type": "villa", + "price": price, + "url": listing_url, + "source": "PropertyFinder", + "direct_owner": is_direct, + "scraped_at": now_iso() + } + leads.append(lead) + log(f" Lead: {name or 'Unknown'} | {phones[0]} | {listing_url}") + + page += 1 + time.sleep(2) + + log(f"PropertyFinder: collected {len(leads)} leads") + return leads + + +def scrape_bayut(area: str = "", max_results: int = 30) -> list[dict]: + """Scrape Bayut for direct-owner Dubai villa listings.""" + leads = [] + + encoded_area = urllib.parse.quote(area) if area else "dubai" + url = f"https://www.bayut.com/for-rent/villa/{encoded_area.lower().replace(' ', '-')}/?owner_only=1" + + log(f"Bayut: starting scrape (area={area or 'Dubai'}, max={max_results})") + html = fetch_url(url) + + if not html: + log("Bayut: failed to fetch listings page") + return leads + + # Extract listing links + listing_urls = re.findall(r'"(https://www\.bayut\.com/property/[^"]+)"', html) + if not listing_urls: + listing_urls = re.findall(r'href="(/property/[^"]+)"', html) + listing_urls = [f"https://www.bayut.com{u}" for u in listing_urls] + + log(f"Bayut: found {len(listing_urls)} listing URLs") + + for listing_url in listing_urls[:max_results]: + time.sleep(1.5) + listing_html = fetch_url(listing_url) + if not listing_html: + continue + + phones = extract_phones(listing_html) + + name_match = re.search( + r'"name"\s*:\s*"([^"]+)".*?"@type"\s*:\s*"(Person|RealEstateAgent)"' + r'|class="[^"]*agent-name[^"]*"[^>]*>\s*([^<]+)', + listing_html, re.DOTALL + ) + name = "" + if name_match: + name = (name_match.group(1) or name_match.group(3) or "").strip() + + price_match = re.search(r'"price"\s*:\s*"?(\d+)"?', listing_html) + price = price_match.group(1) if price_match else "" + + area_match = re.search(r'"addressLocality"\s*:\s*"([^"]+)"', listing_html) + detected_area = area_match.group(1) if area_match else (area or "Dubai") + + if phones: + lead = { + "name": name or "Unknown", + "phone": phones[0], + "all_phones": phones, + "area": detected_area, + "type": "villa", + "price": price, + "url": listing_url, + "source": "Bayut", + "direct_owner": True, # filtered by owner_only=1 + "scraped_at": now_iso() + } + leads.append(lead) + log(f" Lead: {name or 'Unknown'} | {phones[0]} | {detected_area}") + + log(f"Bayut: collected {len(leads)} leads") + return leads + + +def deduplicate(existing: list, new_leads: list) -> tuple[list, int]: + """Deduplicate by phone number.""" + existing_phones = {lead["phone"] for lead in existing} + unique_new = [] + for lead in new_leads: + if lead["phone"] not in existing_phones: + unique_new.append(lead) + existing_phones.add(lead["phone"]) + return unique_new, len(new_leads) - len(unique_new) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Dubai Villa Lead Scraper") + parser.add_argument("--source", choices=["propertyfinder", "bayut", "both"], default="both") + parser.add_argument("--area", type=str, default="", help="Dubai area (e.g. 'Palm Jumeirah', 'Emirates Hills')") + parser.add_argument("--max", type=int, default=30, help="Max leads per source") + parser.add_argument("--log-file", type=str, default=None, help="Extra log file path") + args = parser.parse_args() + + log(f"=== Dubai Villa Scraper Started ===") + log(f"Source: {args.source} | Area: {args.area or 'All Dubai'} | Max: {args.max}") + + existing = load_existing_leads() + log(f"Existing leads in DB: {len(existing)}") + + new_leads = [] + + if args.source in ("propertyfinder", "both"): + pf_leads = scrape_propertyfinder(area=args.area, max_results=args.max) + new_leads.extend(pf_leads) + + if args.source in ("bayut", "both"): + bayut_leads = scrape_bayut(area=args.area, max_results=args.max) + new_leads.extend(bayut_leads) + + unique_leads, dupes = deduplicate(existing, new_leads) + log(f"New unique leads: {len(unique_leads)} | Duplicates skipped: {dupes}") + + all_leads = existing + unique_leads + save_leads(all_leads) + + log(f"=== Done. Total leads in DB: {len(all_leads)} ===") + + # Print summary JSON for orchestrator to consume + summary = { + "status": "ok", + "new_leads": len(unique_leads), + "total_leads": len(all_leads), + "duplicates_skipped": dupes, + "leads": unique_leads + } + print(f"\n__RESULT__:{json.dumps(summary)}") + + +if __name__ == "__main__": + main() diff --git a/agents/requirements.txt b/agents/requirements.txt index ef11339..392d8d4 100644 --- a/agents/requirements.txt +++ b/agents/requirements.txt @@ -1 +1,3 @@ -# No external dependencies required. \ No newline at end of file +# No external dependencies for base scraper (uses stdlib only) +# For Apify integration: no extra deps needed (uses urllib) +# Optional: pip install requests beautifulsoup4 (faster scraping) diff --git a/core/leads-bridge.js b/core/leads-bridge.js new file mode 100644 index 0000000..ba13e43 --- /dev/null +++ b/core/leads-bridge.js @@ -0,0 +1,64 @@ +/** + * Leads Bridge - connects villa scraper output to Google Sheets CRM + * Reads data/state/villa_leads.json and syncs new leads to Sheets + */ +const fs = require("fs"); +const path = require("path"); +const sheets = require("./sheets"); + +const ROOT_DIR = path.resolve(__dirname, ".."); +const LEADS_FILE = path.join(ROOT_DIR, "data", "state", "villa_leads.json"); +const SYNCED_FILE = path.join(ROOT_DIR, "data", "state", "synced_leads.json"); + +function loadLeads() { + if (!fs.existsSync(LEADS_FILE)) return []; + try { return JSON.parse(fs.readFileSync(LEADS_FILE, "utf8")); } + catch { return []; } +} + +function loadSynced() { + if (!fs.existsSync(SYNCED_FILE)) return []; + try { return JSON.parse(fs.readFileSync(SYNCED_FILE, "utf8")); } + catch { return []; } +} + +function saveSynced(leads) { + fs.writeFileSync(SYNCED_FILE, JSON.stringify(leads, null, 2)); +} + +async function syncLeadsToSheets() { + const sheetsService = new (require("./sheets"))(); + const initialized = await sheetsService.init(); + if (!initialized) { + console.log("[leads-bridge] Sheets not initialized, skipping sync"); + return { synced: 0, error: "Sheets not configured" }; + } + + const allLeads = loadLeads(); + const syncedPhones = new Set(loadSynced().map(l => l.phone)); + const unsynced = allLeads.filter(l => !syncedPhones.has(l.phone)); + + console.log(`[leads-bridge] ${unsynced.length} new leads to sync`); + + let synced = 0; + for (const lead of unsynced) { + try { + await sheetsService.addLead({ + name: lead.name, + phone: lead.phone, + notes: `Area: ${lead.area} | Price: ${lead.price} | Source: ${lead.source} | ${lead.url}` + }); + synced++; + } catch (err) { + console.error(`[leads-bridge] Failed to sync ${lead.phone}:`, err.message); + } + } + + // Mark all as synced + const newSynced = [...loadSynced(), ...unsynced]; + saveSynced(newSynced); + + return { synced, total: allLeads.length }; +} + +module.exports = { syncLeadsToSheets };