From 31d6d212e35a26559c8af32ba62f9e03fdaa7968 Mon Sep 17 00:00:00 2001 From: vincentayorinde Date: Thu, 12 Mar 2026 20:29:41 +0000 Subject: [PATCH] privacy: mask phone numbers by default and add --reveal override --- whatsapp-osint.py | 107 ++++++++++++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 36 deletions(-) diff --git a/whatsapp-osint.py b/whatsapp-osint.py index f8ef188..96333d9 100644 --- a/whatsapp-osint.py +++ b/whatsapp-osint.py @@ -1,6 +1,7 @@ import os import re import base64 +import sys import requests import json import time @@ -51,6 +52,39 @@ def sanitize_phone(raw: str) -> str: def is_valid_phone(p: str) -> bool: return p.isdigit() and 8 <= len(p) <= 15 +def mask_phone(phone: str) -> str: + digits = sanitize_phone(phone) + if not digits: + return phone + if len(digits) <= 4: + return "*" * len(digits) + prefix_len = min(3, max(1, len(digits) - 4)) + suffix_len = 4 if len(digits) > 7 else 3 + masked_len = max(len(digits) - prefix_len - suffix_len, 1) + return f"{digits[:prefix_len]}{'*' * masked_len}{digits[-suffix_len:]}" + +def display_phone(phone: str, reveal: bool = False) -> str: + return phone if reveal else mask_phone(phone) + +def mask_text(value: str, reveal: bool = False) -> str: + if reveal: + return value + return re.sub(r"\d{8,15}", lambda match: mask_phone(match.group(0)), value) + +def mask_data(value, reveal: bool = False): + if reveal: + return value + if isinstance(value, dict): + return {key: mask_data(val, reveal=reveal) for key, val in value.items()} + if isinstance(value, list): + return [mask_data(item, reveal=reveal) for item in value] + if isinstance(value, str): + return mask_text(value, reveal=reveal) + return value + +def photo_filename(phone: str, reveal: bool = False) -> str: + return f"whatsapp_{display_phone(phone, reveal=reveal)}.jpg" + def show_menu(): print(Fore.CYAN + "πŸ” Select the type of query:" + Style.RESET_ALL) print() @@ -78,7 +112,7 @@ def save_b64(b64_str: str, path: str) -> bool: except Exception: return False -def process_profile_picture(phone: str, api_key: str): +def process_profile_picture(phone: str, api_key: str, reveal: bool = False): """Process profile photo""" try: resp = fetch_endpoint(phone, api_key, "/wspic/b64", "GET") @@ -90,7 +124,7 @@ def process_profile_picture(phone: str, api_key: str): ctype = resp.headers.get("Content-Type", "") if resp.status_code != 200: - print("❌ Server Error:", resp.text.strip()) + print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal)) return body = resp.text.strip() @@ -99,13 +133,13 @@ def process_profile_picture(phone: str, api_key: str): try: data = resp.json() except Exception: - print("❌ Could not parse JSON.\n", body) + print("❌ Could not parse JSON.\n", mask_text(body, reveal=reveal)) return b64 = data.get("data") or data.get("image") or data.get("base64") if not b64: - print("ℹ️ No base64 field in response:", data) + print("ℹ️ No base64 field in response:", mask_data(data, reveal=reveal)) return - fname = f"whatsapp_{phone}.jpg" + fname = photo_filename(phone, reveal=reveal) if save_b64(b64, fname): print(f"βœ… Image saved as {fname}") else: @@ -115,13 +149,13 @@ def process_profile_picture(phone: str, api_key: str): if "no profile picture" in lo or "does not have a profile picture" in lo: print("ℹ️ This user has no profile picture (or it’s hidden).") else: - fname = f"whatsapp_{phone}.jpg" + fname = photo_filename(phone, reveal=reveal) if save_b64(body, fname): print(f"βœ… Image saved as {fname}") else: - print(f"ℹ️ Text response:\n{body}") + print(f"ℹ️ Text response:\n{mask_text(body, reveal=reveal)}") -def process_user_status(phone: str, api_key: str): +def process_user_status(phone: str, api_key: str, reveal: bool = False): """Process user status""" try: resp = fetch_endpoint(phone, api_key, "/about", "GET") @@ -131,13 +165,13 @@ def process_user_status(phone: str, api_key: str): print("HTTP:", resp.status_code) if resp.status_code != 200: - print("❌ Server Error:", resp.text.strip()) + print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal)) return try: data = resp.json() print(f"\nπŸ“Š {Fore.CYAN}User Status:{Style.RESET_ALL}") - print(f" πŸ“± Number: {phone}") + print(f" πŸ“± Number: {display_phone(phone, reveal=reveal)}") if "about" in data: if data['about'] and data['about'].strip(): @@ -152,9 +186,9 @@ def process_user_status(phone: str, api_key: str): status = "🟒 Online" if data['is_online'] else "πŸ”΄ Offline" print(f" {status}") except Exception: - print("ℹ️ Text response:", resp.text.strip()) + print("ℹ️ Text response:", mask_text(resp.text.strip(), reveal=reveal)) -def process_business_verification(phone: str, api_key: str): +def process_business_verification(phone: str, api_key: str, reveal: bool = False): """Process WhatsApp Business verification""" try: headers = { @@ -171,13 +205,13 @@ def process_business_verification(phone: str, api_key: str): print("HTTP:", resp.status_code) if resp.status_code != 200: - print("❌ Server Error:", resp.text.strip()) + print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal)) return try: data = resp.json() print(f"\n🏒 {Fore.CYAN}WhatsApp Business Verification:{Style.RESET_ALL}") - print(f" πŸ“± Number: {phone}") + print(f" πŸ“± Number: {display_phone(phone, reveal=reveal)}") if isinstance(data, list) and len(data) > 0: business_data = data[0] @@ -194,13 +228,13 @@ def process_business_verification(phone: str, api_key: str): if "verifiedName" in business_data and business_data['verifiedName']: print(f" πŸͺ Verified Name: {business_data['verifiedName']}") if "query" in business_data: - print(f" πŸ” Query: {business_data['query']}") + print(f" πŸ” Query: {mask_text(str(business_data['query']), reveal=reveal)}") else: print(" ℹ️ No business info found") except Exception: - print("ℹ️ Text response:", resp.text.strip()) + print("ℹ️ Text response:", mask_text(resp.text.strip(), reveal=reveal)) -def process_device_info(phone: str, api_key: str): +def process_device_info(phone: str, api_key: str, reveal: bool = False): """Process linked device information""" try: resp = fetch_endpoint(phone, api_key, "/devices", "GET") @@ -210,13 +244,13 @@ def process_device_info(phone: str, api_key: str): print("HTTP:", resp.status_code) if resp.status_code != 200: - print("❌ Server Error:", resp.text.strip()) + print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal)) return try: data = resp.json() print(f"\nπŸ“± {Fore.CYAN}Device Information:{Style.RESET_ALL}") - print(f" πŸ“ž Number: {phone}") + print(f" πŸ“ž Number: {display_phone(phone, reveal=reveal)}") if "devices" in data: if isinstance(data['devices'], list) and data['devices']: @@ -241,9 +275,9 @@ def process_device_info(phone: str, api_key: str): if "devices" not in data and "message" not in data: print(" ℹ️ No device information found") except Exception: - print("ℹ️ Text response:", resp.text.strip()) + print("ℹ️ Text response:", mask_text(resp.text.strip(), reveal=reveal)) -def process_osint_info(phone: str, api_key: str): +def process_osint_info(phone: str, api_key: str, reveal: bool = False): """Process full OSINT data""" try: resp = fetch_endpoint(phone, api_key, "/wspic/dck", "GET") @@ -253,13 +287,13 @@ def process_osint_info(phone: str, api_key: str): print("HTTP:", resp.status_code) if resp.status_code != 200: - print("❌ Server Error:", resp.text.strip()) + print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal)) return try: data = resp.json() print(f"\nπŸ” {Fore.CYAN}Full OSINT Information:{Style.RESET_ALL}") - print(f" πŸ“± Number: {phone}") + print(f" πŸ“± Number: {display_phone(phone, reveal=reveal)}") if "verification_status" in data: print(f" βœ… Verification: {data['verification_status']}") if "last_seen" in data: @@ -269,11 +303,11 @@ def process_osint_info(phone: str, api_key: str): if "osint_data" in data: print(" πŸ“Š Additional OSINT data available") print(f"\nπŸ“„ {Fore.YELLOW}Complete Data:{Style.RESET_ALL}") - print(json.dumps(data, indent=2, ensure_ascii=False)) + print(json.dumps(mask_data(data, reveal=reveal), indent=2, ensure_ascii=False)) except Exception: - print("ℹ️ Text response:", resp.text.strip()) + print("ℹ️ Text response:", mask_text(resp.text.strip(), reveal=reveal)) -def process_privacy_settings(phone: str, api_key: str): +def process_privacy_settings(phone: str, api_key: str, reveal: bool = False): """Process privacy settings""" try: resp = fetch_endpoint(phone, api_key, "/privacy", "GET") @@ -283,13 +317,13 @@ def process_privacy_settings(phone: str, api_key: str): print("HTTP:", resp.status_code) if resp.status_code != 200: - print("❌ Server Error:", resp.text.strip()) + print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal)) return try: data = resp.json() print(f"\nπŸ”’ {Fore.CYAN}Privacy Settings:{Style.RESET_ALL}") - print(f" πŸ“± Number: {phone}") + print(f" πŸ“± Number: {display_phone(phone, reveal=reveal)}") if "privacy" in data: print(f" πŸ”’ Privacy Settings: {data['privacy']}") elif "profile_visibility" in data: @@ -304,10 +338,11 @@ def process_privacy_settings(phone: str, api_key: str): if "profile_picture" in data: print(f" πŸ–ΌοΈ Profile Photo Visibility: {data['profile_picture']}") except Exception: - print("ℹ️ Text response:", resp.text.strip()) + print("ℹ️ Text response:", mask_text(resp.text.strip(), reveal=reveal)) def main(): show_banner() + reveal = "--reveal" in sys.argv[1:] api_key = os.getenv("RAPIDAPI_KEY") if not api_key: print("❌ RAPIDAPI_KEY not found in .env") @@ -329,21 +364,21 @@ def main(): return print(f"\nπŸ” {Fore.GREEN}Processing request...{Style.RESET_ALL}") - print(f"πŸ“± Number: {phone}") + print(f"πŸ“± Number: {display_phone(phone, reveal=reveal)}") print(f"🎯 Query: {ENDPOINTS[choice]['name']}\n") if choice == "1": - process_profile_picture(phone, api_key) + process_profile_picture(phone, api_key, reveal=reveal) elif choice == "2": - process_user_status(phone, api_key) + process_user_status(phone, api_key, reveal=reveal) elif choice == "3": - process_business_verification(phone, api_key) + process_business_verification(phone, api_key, reveal=reveal) elif choice == "4": - process_device_info(phone, api_key) + process_device_info(phone, api_key, reveal=reveal) elif choice == "5": - process_osint_info(phone, api_key) + process_osint_info(phone, api_key, reveal=reveal) elif choice == "6": - process_privacy_settings(phone, api_key) + process_privacy_settings(phone, api_key, reveal=reveal) print(f"\nβœ… {Fore.GREEN}Query completed.{Style.RESET_ALL}")