diff --git a/README.md b/README.md index 98c13a7..8605c40 100644 --- a/README.md +++ b/README.md @@ -49,15 +49,20 @@ Optionally, scan a specific category or single module: user-scanner -u -c dev user-scanner -l # Lists all available modules user-scanner -u -m github -user-scanner -u -p +``` +Also, the output format can be specified:
+\* Errors and warnings will only appear when the format is set to "console" +```bash +user-scanner -u -o console #Default format +user-scanner -u -o csv +user-scanner -u -o json ``` Generate multiple username variations by appending a suffix: ```bash user-scanner -u -p - ``` Optionally, scan a specific category or single module with limit: diff --git a/user_scanner/__main__.py b/user_scanner/__main__.py index c50bdb9..eeff161 100644 --- a/user_scanner/__main__.py +++ b/user_scanner/__main__.py @@ -1,27 +1,13 @@ import argparse import time import re -from user_scanner.core.orchestrator import run_checks, load_modules , generate_permutations, load_categories +from user_scanner.cli import printer +from user_scanner.core.orchestrator import load_modules , generate_permutations, load_categories from colorama import Fore, Style -from .cli import banner -from .cli.banner import print_banner +from user_scanner.cli.banner import print_banner MAX_PERMUTATIONS_LIMIT = 100 # To prevent excessive generation -def list_modules(category=None): - categories = load_categories() - categories_to_list = [category] if category else categories.keys() - - for cat_name in categories_to_list: - path = categories[cat_name] - modules = load_modules(path) - print(Fore.MAGENTA + - f"\n== {cat_name.upper()} SITES =={Style.RESET_ALL}") - for module in modules: - site_name = module.__name__.split(".")[-1] - print(f" - {site_name}") - - def main(): parser = argparse.ArgumentParser( prog="user-scanner", @@ -54,29 +40,37 @@ def main(): parser.add_argument( "-d", "--delay",type=float,default=0,help="Delay in seconds between requests (recommended: 1-2 seconds)" ) + + parser.add_argument( + "-o", "--output-format", choices=["console", "csv", "json"], default="console", help="Specify output format (default: console)" + ) args = parser.parse_args() + Printer = printer.Printer(args.output_format) + if args.list: - list_modules(args.category) + Printer.print_modules(args.category) return if not args.username: parser.print_help() return - - # Special username checks before run - if (args.module == "x" or args.category == "social"): - if re.search(r"[^a-zA-Z0-9._-]", args.username): - print( - Fore.RED + f"[!] Username '{args.username}' contains unsupported special characters. X (Twitter) doesn't support these." + Style.RESET_ALL) - if (args.module == "bluesky" or args.category == "social"): - if re.search(r"[^a-zA-Z0-9\.-]", args.username): - print( - Fore.RED + f"[!] Username '{args.username}' contains unsupported special characters. Bluesky will throw error. (Supported: only hyphens and digits)" + Style.RESET_ALL + "\n") - print_banner() + - if args.permute and args.delay == 0: + if Printer.is_console: + print_banner() + # Special username checks before run + if (args.module == "x" or args.category == "social"): + if re.search(r"[^a-zA-Z0-9._-]", args.username): + print( + Fore.RED + f"[!] Username '{args.username}' contains unsupported special characters. X (Twitter) doesn't support these." + Style.RESET_ALL) + if (args.module == "bluesky" or args.category == "social"): + if re.search(r"[^a-zA-Z0-9\.-]", args.username): + print( + Fore.RED + f"[!] Username '{args.username}' contains unsupported special characters. Bluesky will throw error. (Supported: only hyphens and digits)" + Style.RESET_ALL + "\n") + + if args.permute and args.delay == 0 and Printer.is_console: print( Fore.YELLOW + "[!] Warning: You're generating multiple usernames with NO delay between requests. " @@ -88,46 +82,51 @@ def main(): #Added permutation support , generate all possible permutation of given sequence. if args.permute: usernames = generate_permutations(args.username, args.permute , args.stop) - print(Fore.CYAN + f"[+] Generated {len(usernames)} username permutations" + Style.RESET_ALL) + if Printer.is_console: + print( + Fore.CYAN + f"[+] Generated {len(usernames)} username permutations" + Style.RESET_ALL) - - if args.module and "." in args.module: args.module = args.module.replace(".", "_") + def run_all_usernames(func, arg = None): + """ + Executes a function for all given usernames. + Made in order to simplify main() + """ + Printer.print_start() + for i, name in enumerate(usernames): + is_last = i == len(usernames) - 1 + if arg == None: + func(name, Printer, is_last) + else: + func(arg, name, Printer, is_last) + if args.delay > 0 and not is_last: + time.sleep(args.delay) + Printer.print_end() if args.module: # Single module search across all categories - found = False - for cat_path in load_categories().values(): - modules = load_modules(cat_path) + from user_scanner.core.orchestrator import run_module_single, find_module + modules = find_module(args.module) + + if len(modules) > 0: for module in modules: - site_name = module.__name__.split(".")[-1] - if site_name.lower() == args.module.lower(): - from user_scanner.core.orchestrator import run_module_single - for name in usernames: # <-- permutation support here - run_module_single(module, name) - if args.delay > 0: - time.sleep(args.delay) - found = True - if not found: + run_all_usernames(run_module_single, module) + else: print( Fore.RED + f"[!] Module '{args.module}' not found in any category." + Style.RESET_ALL) + elif args.category: # Category-wise scan category_package = load_categories().get(args.category) from user_scanner.core.orchestrator import run_checks_category - - for name in usernames: # <-- permutation support here - run_checks_category(category_package, name, args.verbose) - if args.delay > 0: - time.sleep(args.delay) + run_all_usernames(run_checks_category, category_package) + else: # Full scan - for name in usernames: - run_checks(name) - if args.delay > 0: - time.sleep(args.delay) + from user_scanner.core.orchestrator import run_checks + run_all_usernames(run_checks) if __name__ == "__main__": diff --git a/user_scanner/cli/printer.py b/user_scanner/cli/printer.py new file mode 100644 index 0000000..ea641ff --- /dev/null +++ b/user_scanner/cli/printer.py @@ -0,0 +1,150 @@ +from colorama import Fore, Style +from typing import Literal +from user_scanner.core.result import Result, Status + +INDENT = " " + +JSON_TEMPLATE = """{{ +\t"site_name": "{site_name}", +\t"username": "{username}", +\t"result": "{result}" +}}""".replace("\t", INDENT) + +JSON_TEMPLATE_ERROR = """{{ +\t"site_name": "{site_name}", +\t"username": "{username}", +\t"result": "Error", +\t"reason": "{reason}" +}}""".replace("\t", INDENT) + + +CSV_HEADER = "site_name,username,result,reason" +CSV_TEMPLATE = "{site_name},{username},{result},{reason}" + + +def indentate(msg: str, indent: int): + if indent <= 0: + return msg + tabs = INDENT * indent + return "\n".join([f"{tabs}{line}" for line in msg.split("\n")]) + + +class Printer: + def __init__(self, output_format: Literal["console", "csv", "json"]) -> None: + if not output_format in ["console", "csv", "json"]: + raise ValueError(f"Invalid output-format: {output_format}") + self.mode: str = output_format + self.indent: int = 0 + + @property + def is_console(self) -> bool: + return self.mode == "console" + + @property + def is_csv(self) -> bool: + return self.mode == "csv" + + @property + def is_json(self) -> bool: + return self.mode == "json" + + def print_start(self, json_char: str = "[") -> None: + if self.is_json: + self.indent += 1 + print(indentate(json_char, self.indent - 1)) + elif self.is_csv: + print(CSV_HEADER) + + def print_end(self, json_char: str = "]") -> None: + if not self.is_json: + return + self.indent = max(self.indent - 1, 0) + print(indentate(json_char, self.indent)) + + def get_result_output(self, site_name: str, username: str, result: Result) -> str: + if result == None: + result = Result.error("Invalid return value: None") + + if isinstance(result, int): + result = Result.from_number(result) + + match (result.status, self.mode): + case (Status.AVAILABLE, "console"): + return f"{INDENT}{Fore.GREEN}[✔] {site_name} ({username}): Available{Style.RESET_ALL}" + + case (Status.TAKEN, "console"): + return f"{INDENT}{Fore.RED}[✘] {site_name} ({username}): Taken{Style.RESET_ALL}" + + case (Status.ERROR, "console"): + reason = "" + if isinstance(result, Result) and result.has_reason(): + reason = f" ({result.get_reason()})" + return f"{INDENT}{Fore.YELLOW}[!] {site_name} ({username}): Error{reason}{Style.RESET_ALL}" + + case (Status.AVAILABLE, "json") | (Status.TAKEN, "json"): + return indentate(JSON_TEMPLATE, self.indent).format( + site_name=site_name, + username=username, + result=str(result.status) + ) + + case (Status.ERROR, "json"): + return indentate(JSON_TEMPLATE_ERROR, self.indent).format( + site_name=site_name, + username=username, + reason=result.get_reason() + ) + + case (_, "csv"): + return CSV_TEMPLATE.format( + site_name=site_name, + username=username, + result=str(result.status), + reason=result.get_reason() + ) + + return "" + + def print_modules(self, category: str | None = None): + from user_scanner.core.orchestrator import load_categories, load_modules + categories = load_categories() + categories_to_list = [category] if category else categories.keys() + + # Print the start + if self.is_json: + self.print_start("{") + elif self.is_csv: + print("category,site_name") + + for i, cat_name in enumerate(categories_to_list): + path = categories[cat_name] + modules = load_modules(path) + + # Print for each category + match self.mode: + case "console": + print(Fore.MAGENTA + + f"\n== {cat_name.upper()} SITES =={Style.RESET_ALL}") + case "json": + self.print_start(f"\"{cat_name}\": [") + + for j, module in enumerate(modules): + is_last = j == len(modules) - 1 + site_name = module.__name__.split(".")[-1].capitalize() + + # Print for each site name + match self.mode: + case "console": + print(f"{INDENT}- {site_name}") + case "json": + msg = f"\"{site_name}\"" + ("" if is_last else ",") + print(indentate(msg, self.indent)) + case "csv": + print(f"{cat_name},{site_name}") + + if self.is_json: + is_last = i == len(categories_to_list) - 1 + self.print_end("]" if is_last else "],") + + if self.is_json: + self.print_end("}") diff --git a/user_scanner/core/orchestrator.py b/user_scanner/core/orchestrator.py index 5e9e3cc..5e652bc 100644 --- a/user_scanner/core/orchestrator.py +++ b/user_scanner/core/orchestrator.py @@ -5,6 +5,7 @@ from itertools import permutations import httpx from pathlib import Path +from user_scanner.cli.printer import Printer from user_scanner.core.result import Result, AnyResult from typing import Callable, Dict, List @@ -39,7 +40,20 @@ def load_categories() -> Dict[str, Path]: return categories -def worker_single(module, username, i): +def find_module(name: str): + name = name.lower() + + matches = [ + module + for category_path in load_categories().values() + for module in load_modules(category_path) + if module.__name__.split(".")[-1].lower() == name + ] + + return matches + + +def worker_single(module, username: str, i: int, printer: Printer, last: bool = True) -> AnyResult: global print_queue func = next((getattr(module, f) for f in dir(module) @@ -52,49 +66,46 @@ def worker_single(module, username, i): if func: try: result = func(username) - reason = "" - - if isinstance(result, Result) and result.has_reason(): - reason = f" ({result.get_reason()})" - - if result == 1: - output = f" {Fore.GREEN}[✔] {site_name} ({username}): Available{Style.RESET_ALL}" - elif result == 0: - output = f" {Fore.RED}[✘] {site_name} ({username}): Taken{Style.RESET_ALL}" - else: - output = f" {Fore.YELLOW}[!] {site_name} ({username}): Error{reason}{Style.RESET_ALL}" + output = printer.get_result_output(site_name, username, result) + if last == False and printer.is_json: + output += "," except Exception as e: - output = f" {Fore.YELLOW}[!] {site_name}: Exception - {e}{Style.RESET_ALL}" + if Printer.is_console: + output = f" {Fore.YELLOW}[!] {site_name}: Exception - {e}{Style.RESET_ALL}" else: - output = f" {Fore.YELLOW}[!] {site_name} has no validate_ function{Style.RESET_ALL}" + if Printer.is_console: + output = f" {Fore.YELLOW}[!] {site_name} has no validate_ function{Style.RESET_ALL}" with lock: # Waits for in-order printing while i != print_queue: lock.wait() - print(output) + if output != "": + print(output) print_queue += 1 lock.notify_all() -def run_module_single(module, username): +def run_module_single(module, username: str, printer: Printer, last: bool = True) -> AnyResult: # Just executes as if it was a thread - worker_single(module, username, print_queue) + return worker_single(module, username, print_queue, printer, last) -def run_checks_category(category_path:Path, username:str, verbose=False): +def run_checks_category(category_path: Path, username: str, printer: Printer, last: bool = True): global print_queue modules = load_modules(category_path) - category_name = category_path.stem.capitalize() - print(f"{Fore.MAGENTA}== {category_name} SITES =={Style.RESET_ALL}") + if printer.is_console: + category_name = category_path.stem.capitalize() + print(f"\n{Fore.MAGENTA}== {category_name} SITES =={Style.RESET_ALL}") print_queue = 0 threads = [] for i, module in enumerate(modules): - t = threading.Thread(target=worker_single, args=(module, username, i)) + last_thread = last and (i == len(modules) - 1) + t = threading.Thread(target=worker_single, args=(module, username, i, printer, last_thread)) threads.append(t) t.start() @@ -102,13 +113,14 @@ def run_checks_category(category_path:Path, username:str, verbose=False): t.join() -def run_checks(username): - print(f"\n{Fore.CYAN} Checking username: {username}{Style.RESET_ALL}\n") - - for category_path in load_categories().values(): - run_checks_category(category_path, username) - print() +def run_checks(username: str, printer: Printer, last:bool = True): + if printer.is_console: + print(f"\n{Fore.CYAN} Checking username: {username}{Style.RESET_ALL}") + categories = list(load_categories().values()) + for i, category_path in enumerate(categories): + last_cat: int = last and (i == len(categories) - 1) + run_checks_category(category_path, username, printer, last_cat) def make_get_request(url: str, **kwargs) -> httpx.Response: """Simple wrapper to **httpx.get** that predefines headers and timeout""" diff --git a/user_scanner/core/result.py b/user_scanner/core/result.py index ea55b47..15365a3 100644 --- a/user_scanner/core/result.py +++ b/user_scanner/core/result.py @@ -18,6 +18,9 @@ class Status(Enum): AVAILABLE = 1 ERROR = 2 + def __str__(self): + return super().__str__().split(".")[1].capitalize() + class Result: def __init__(self, status: Status, reason: str | Exception | None = None):