Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 71 additions & 36 deletions whatsapp-osint.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import re
import base64
import sys
import requests
import json
import time
Expand Down Expand Up @@ -51,6 +52,39 @@ def sanitize_phone(raw: str) -> str:
def is_valid_phone(p: str) -> bool:
return p.isdigit() and 8 <= len(p) <= 15

def mask_phone(phone: str) -> str:
digits = sanitize_phone(phone)
if not digits:
return phone
if len(digits) <= 4:
return "*" * len(digits)
prefix_len = min(3, max(1, len(digits) - 4))
suffix_len = 4 if len(digits) > 7 else 3
masked_len = max(len(digits) - prefix_len - suffix_len, 1)
return f"{digits[:prefix_len]}{'*' * masked_len}{digits[-suffix_len:]}"

def display_phone(phone: str, reveal: bool = False) -> str:
return phone if reveal else mask_phone(phone)

def mask_text(value: str, reveal: bool = False) -> str:
if reveal:
return value
return re.sub(r"\d{8,15}", lambda match: mask_phone(match.group(0)), value)

def mask_data(value, reveal: bool = False):
if reveal:
return value
if isinstance(value, dict):
return {key: mask_data(val, reveal=reveal) for key, val in value.items()}
if isinstance(value, list):
return [mask_data(item, reveal=reveal) for item in value]
if isinstance(value, str):
return mask_text(value, reveal=reveal)
return value

def photo_filename(phone: str, reveal: bool = False) -> str:
return f"whatsapp_{display_phone(phone, reveal=reveal)}.jpg"

def show_menu():
print(Fore.CYAN + "🔍 Select the type of query:" + Style.RESET_ALL)
print()
Expand Down Expand Up @@ -78,7 +112,7 @@ def save_b64(b64_str: str, path: str) -> bool:
except Exception:
return False

def process_profile_picture(phone: str, api_key: str):
def process_profile_picture(phone: str, api_key: str, reveal: bool = False):
"""Process profile photo"""
try:
resp = fetch_endpoint(phone, api_key, "/wspic/b64", "GET")
Expand All @@ -90,7 +124,7 @@ def process_profile_picture(phone: str, api_key: str):
ctype = resp.headers.get("Content-Type", "")

if resp.status_code != 200:
print("❌ Server Error:", resp.text.strip())
print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal))
return

body = resp.text.strip()
Expand All @@ -99,13 +133,13 @@ def process_profile_picture(phone: str, api_key: str):
try:
data = resp.json()
except Exception:
print("❌ Could not parse JSON.\n", body)
print("❌ Could not parse JSON.\n", mask_text(body, reveal=reveal))
return
b64 = data.get("data") or data.get("image") or data.get("base64")
if not b64:
print("ℹ️ No base64 field in response:", data)
print("ℹ️ No base64 field in response:", mask_data(data, reveal=reveal))
return
fname = f"whatsapp_{phone}.jpg"
fname = photo_filename(phone, reveal=reveal)
if save_b64(b64, fname):
print(f"✅ Image saved as {fname}")
else:
Expand All @@ -115,13 +149,13 @@ def process_profile_picture(phone: str, api_key: str):
if "no profile picture" in lo or "does not have a profile picture" in lo:
print("ℹ️ This user has no profile picture (or it’s hidden).")
else:
fname = f"whatsapp_{phone}.jpg"
fname = photo_filename(phone, reveal=reveal)
if save_b64(body, fname):
print(f"✅ Image saved as {fname}")
else:
print(f"ℹ️ Text response:\n{body}")
print(f"ℹ️ Text response:\n{mask_text(body, reveal=reveal)}")

def process_user_status(phone: str, api_key: str):
def process_user_status(phone: str, api_key: str, reveal: bool = False):
"""Process user status"""
try:
resp = fetch_endpoint(phone, api_key, "/about", "GET")
Expand All @@ -131,13 +165,13 @@ def process_user_status(phone: str, api_key: str):

print("HTTP:", resp.status_code)
if resp.status_code != 200:
print("❌ Server Error:", resp.text.strip())
print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal))
return

try:
data = resp.json()
print(f"\n📊 {Fore.CYAN}User Status:{Style.RESET_ALL}")
print(f" 📱 Number: {phone}")
print(f" 📱 Number: {display_phone(phone, reveal=reveal)}")

if "about" in data:
if data['about'] and data['about'].strip():
Expand All @@ -152,9 +186,9 @@ def process_user_status(phone: str, api_key: str):
status = "🟢 Online" if data['is_online'] else "🔴 Offline"
print(f" {status}")
except Exception:
print("ℹ️ Text response:", resp.text.strip())
print("ℹ️ Text response:", mask_text(resp.text.strip(), reveal=reveal))

def process_business_verification(phone: str, api_key: str):
def process_business_verification(phone: str, api_key: str, reveal: bool = False):
"""Process WhatsApp Business verification"""
try:
headers = {
Expand All @@ -171,13 +205,13 @@ def process_business_verification(phone: str, api_key: str):

print("HTTP:", resp.status_code)
if resp.status_code != 200:
print("❌ Server Error:", resp.text.strip())
print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal))
return

try:
data = resp.json()
print(f"\n🏢 {Fore.CYAN}WhatsApp Business Verification:{Style.RESET_ALL}")
print(f" 📱 Number: {phone}")
print(f" 📱 Number: {display_phone(phone, reveal=reveal)}")

if isinstance(data, list) and len(data) > 0:
business_data = data[0]
Expand All @@ -194,13 +228,13 @@ def process_business_verification(phone: str, api_key: str):
if "verifiedName" in business_data and business_data['verifiedName']:
print(f" 🏪 Verified Name: {business_data['verifiedName']}")
if "query" in business_data:
print(f" 🔍 Query: {business_data['query']}")
print(f" 🔍 Query: {mask_text(str(business_data['query']), reveal=reveal)}")
else:
print(" ℹ️ No business info found")
except Exception:
print("ℹ️ Text response:", resp.text.strip())
print("ℹ️ Text response:", mask_text(resp.text.strip(), reveal=reveal))

def process_device_info(phone: str, api_key: str):
def process_device_info(phone: str, api_key: str, reveal: bool = False):
"""Process linked device information"""
try:
resp = fetch_endpoint(phone, api_key, "/devices", "GET")
Expand All @@ -210,13 +244,13 @@ def process_device_info(phone: str, api_key: str):

print("HTTP:", resp.status_code)
if resp.status_code != 200:
print("❌ Server Error:", resp.text.strip())
print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal))
return

try:
data = resp.json()
print(f"\n📱 {Fore.CYAN}Device Information:{Style.RESET_ALL}")
print(f" 📞 Number: {phone}")
print(f" 📞 Number: {display_phone(phone, reveal=reveal)}")

if "devices" in data:
if isinstance(data['devices'], list) and data['devices']:
Expand All @@ -241,9 +275,9 @@ def process_device_info(phone: str, api_key: str):
if "devices" not in data and "message" not in data:
print(" ℹ️ No device information found")
except Exception:
print("ℹ️ Text response:", resp.text.strip())
print("ℹ️ Text response:", mask_text(resp.text.strip(), reveal=reveal))

def process_osint_info(phone: str, api_key: str):
def process_osint_info(phone: str, api_key: str, reveal: bool = False):
"""Process full OSINT data"""
try:
resp = fetch_endpoint(phone, api_key, "/wspic/dck", "GET")
Expand All @@ -253,13 +287,13 @@ def process_osint_info(phone: str, api_key: str):

print("HTTP:", resp.status_code)
if resp.status_code != 200:
print("❌ Server Error:", resp.text.strip())
print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal))
return

try:
data = resp.json()
print(f"\n🔍 {Fore.CYAN}Full OSINT Information:{Style.RESET_ALL}")
print(f" 📱 Number: {phone}")
print(f" 📱 Number: {display_phone(phone, reveal=reveal)}")
if "verification_status" in data:
print(f" ✅ Verification: {data['verification_status']}")
if "last_seen" in data:
Expand All @@ -269,11 +303,11 @@ def process_osint_info(phone: str, api_key: str):
if "osint_data" in data:
print(" 📊 Additional OSINT data available")
print(f"\n📄 {Fore.YELLOW}Complete Data:{Style.RESET_ALL}")
print(json.dumps(data, indent=2, ensure_ascii=False))
print(json.dumps(mask_data(data, reveal=reveal), indent=2, ensure_ascii=False))
except Exception:
print("ℹ️ Text response:", resp.text.strip())
print("ℹ️ Text response:", mask_text(resp.text.strip(), reveal=reveal))

def process_privacy_settings(phone: str, api_key: str):
def process_privacy_settings(phone: str, api_key: str, reveal: bool = False):
"""Process privacy settings"""
try:
resp = fetch_endpoint(phone, api_key, "/privacy", "GET")
Expand All @@ -283,13 +317,13 @@ def process_privacy_settings(phone: str, api_key: str):

print("HTTP:", resp.status_code)
if resp.status_code != 200:
print("❌ Server Error:", resp.text.strip())
print("❌ Server Error:", mask_text(resp.text.strip(), reveal=reveal))
return

try:
data = resp.json()
print(f"\n🔒 {Fore.CYAN}Privacy Settings:{Style.RESET_ALL}")
print(f" 📱 Number: {phone}")
print(f" 📱 Number: {display_phone(phone, reveal=reveal)}")
if "privacy" in data:
print(f" 🔒 Privacy Settings: {data['privacy']}")
elif "profile_visibility" in data:
Expand All @@ -304,10 +338,11 @@ def process_privacy_settings(phone: str, api_key: str):
if "profile_picture" in data:
print(f" 🖼️ Profile Photo Visibility: {data['profile_picture']}")
except Exception:
print("ℹ️ Text response:", resp.text.strip())
print("ℹ️ Text response:", mask_text(resp.text.strip(), reveal=reveal))

def main():
show_banner()
reveal = "--reveal" in sys.argv[1:]
api_key = os.getenv("RAPIDAPI_KEY")
if not api_key:
print("❌ RAPIDAPI_KEY not found in .env")
Expand All @@ -329,21 +364,21 @@ def main():
return

print(f"\n🔍 {Fore.GREEN}Processing request...{Style.RESET_ALL}")
print(f"📱 Number: {phone}")
print(f"📱 Number: {display_phone(phone, reveal=reveal)}")
print(f"🎯 Query: {ENDPOINTS[choice]['name']}\n")

if choice == "1":
process_profile_picture(phone, api_key)
process_profile_picture(phone, api_key, reveal=reveal)
elif choice == "2":
process_user_status(phone, api_key)
process_user_status(phone, api_key, reveal=reveal)
elif choice == "3":
process_business_verification(phone, api_key)
process_business_verification(phone, api_key, reveal=reveal)
elif choice == "4":
process_device_info(phone, api_key)
process_device_info(phone, api_key, reveal=reveal)
elif choice == "5":
process_osint_info(phone, api_key)
process_osint_info(phone, api_key, reveal=reveal)
elif choice == "6":
process_privacy_settings(phone, api_key)
process_privacy_settings(phone, api_key, reveal=reveal)

print(f"\n✅ {Fore.GREEN}Query completed.{Style.RESET_ALL}")

Expand Down