diff --git a/core/cli.py b/core/cli.py index e1c8fce..39d7882 100644 --- a/core/cli.py +++ b/core/cli.py @@ -1,6 +1,8 @@ import logging import multiprocessing import sys +import time +import queue as _queue from multiprocessing.pool import Pool from typing import Dict, List, Optional, Tuple @@ -135,37 +137,136 @@ def search_pubkey( ) logging.info(f"Using {gpu_counts} OpenCL device(s)") - result_count = 0 + FLUSH_INTERVAL = 5.0 + PRINT_RATE_THRESHOLD = 5.0 # keys/sec; above this, suppress per-key logs + + found_count = 0 + saved_total = 0 + pending_results: List = [] + last_flush = time.time() + last_print_time = time.time() + keys_since_last_print = 0 + high_throughput = False + with multiprocessing.Manager() as manager: with Pool(processes=gpu_counts) as pool: kernel_source = load_kernel_source( prefix_patterns, suffix_patterns, is_case_sensitive ) lock = manager.Lock() - while result_count < count: - stop_flag = manager.Value("i", 0) - results = pool.starmap( - multi_gpu_init, - [ + result_queue = manager.Queue() + stop_flag = manager.Value("i", 0) + + async_results = [] + for x in range(gpu_counts): + async_results.append( + pool.apply_async( + multi_gpu_init, ( x, HostSetting(kernel_source, iteration_bits), gpu_counts, stop_flag, lock, + result_queue, chosen_devices, - ) - for x in range(gpu_counts) - ], + ), + ) ) - result_count += save_result( - results, - output_dir, - prefix_patterns, - suffix_patterns, - pattern_dirs, - is_case_sensitive, + + while found_count < count: + try: + res = result_queue.get(timeout=1.0) + except _queue.Empty: + res = None + now = time.time() + if isinstance(res, (list, tuple, bytearray, bytes)) and len(res) > 0 and res[0]: + pending_results.append(list(res)) + found_count += 1 + keys_since_last_print += 1 + + elapsed_since_print = now - last_print_time + if elapsed_since_print >= 1.0: + rate = keys_since_last_print / elapsed_since_print + high_throughput = rate > PRINT_RATE_THRESHOLD + if high_throughput: + logging.info( + f"Progress: {found_count}/{count} ({rate:.1f} keys/sec)" + ) + else: + logging.info(f"Progress: {found_count}/{count}") + last_print_time = now + keys_since_last_print = 0 + elif found_count == count: + rate = keys_since_last_print / elapsed_since_print if elapsed_since_print > 0 else 0 + logging.info( + f"Progress: {found_count}/{count} ({rate:.1f} keys/sec)" + ) + + if (now - last_flush) >= FLUSH_INTERVAL or found_count >= count: + if pending_results: + saved = save_result( + pending_results, output_dir, + prefix_patterns, suffix_patterns, + pattern_dirs, is_case_sensitive, + quiet=high_throughput, + ) + saved_total += saved + pending_results.clear() + last_flush = now + + with lock: + stop_flag.value = 1 + + logging.info("Signaled workers to stop; draining remaining results...") + while True: + try: + res = result_queue.get(timeout=0.5) + except _queue.Empty: + res = None + + now = time.time() + if isinstance(res, (list, tuple, bytearray, bytes)) and len(res) > 0 and res[0]: + pending_results.append(list(res)) + + if (now - last_flush) >= FLUSH_INTERVAL and pending_results: + saved = save_result( + pending_results, output_dir, + prefix_patterns, suffix_patterns, + pattern_dirs, is_case_sensitive, + ) + saved_total += saved + pending_results.clear() + last_flush = now + + all_finished = all(a.ready() for a in async_results) + if all_finished: + while True: + try: + res = result_queue.get_nowait() + except _queue.Empty: + break + if isinstance(res, (list, tuple, bytearray, bytes)) and len(res) > 0 and res[0]: + pending_results.append(list(res)) + break + + if pending_results: + saved = save_result( + pending_results, output_dir, + prefix_patterns, suffix_patterns, + pattern_dirs, is_case_sensitive, + quiet=high_throughput, ) + saved_total += saved + pending_results.clear() + + for a in async_results: + try: + a.get(timeout=10) + except Exception: + pass + + logging.info(f"Search finished. Total matches found: {found_count}, total saved: {saved_total}") @cli.command(context_settings={"show_default": True}) diff --git a/core/searcher.py b/core/searcher.py index 9e06dbf..ffd08c1 100644 --- a/core/searcher.py +++ b/core/searcher.py @@ -67,7 +67,7 @@ def find(self, log_stats: bool = True) -> bytearray: cl.enqueue_copy(self.command_queue, self.memobj_key32, self.setting.key32) global_work_size = self.setting.global_work_size // self.gpu_chunks local_size = self.setting.local_work_size - global_size = ((global_work_size + local_size - 1) // local_size) * local_size # align global size and local size + global_size = ((global_work_size + local_size - 1) // local_size) * local_size cl.enqueue_nd_range_kernel( self.command_queue, self.kernel, @@ -76,14 +76,20 @@ def find(self, log_stats: bool = True) -> bytearray: ) self.command_queue.flush() self.setting.increase_key32() - if self.prev_time is not None and self.is_nvidia: - time.sleep(self.prev_time * 0.98) cl.enqueue_copy(self.command_queue, self.output, self.memobj_output).wait() self.prev_time = time.time() - start_time if log_stats: logging.info( f"GPU {self.display_index} Speed: {global_work_size / ((time.time() - start_time) * 1e6):.2f} MH/s" ) + + # If a match was found, clear the GPU output buffer so we don't report it again + if self.output[0]: + result = bytearray(self.output) + self.output[:] = bytearray(33) + cl.enqueue_copy(self.command_queue, self.memobj_output, self.output).wait() + return result + return self.output @@ -93,8 +99,13 @@ def multi_gpu_init( gpu_counts: int, stop_flag, lock, + result_queue, chosen_devices: Optional[Tuple[int, List[int]]] = None, -) -> List: +) -> None: + """ + Long-running worker for a single GPU. Pushes matches into result_queue + and exits when stop_flag.value is set. + """ try: searcher = Searcher( kernel_source=setting.kernel_source, @@ -107,21 +118,23 @@ def multi_gpu_init( while True: result = searcher.find(i == 0) if result[0]: - with lock: - if not stop_flag.value: - stop_flag.value = 1 - return list(result) + try: + result_queue.put(list(result)) + except Exception: + logging.exception("Failed to put result into queue") if time.time() - st > max(gpu_counts, 1): i = 0 st = time.time() with lock: if stop_flag.value: - return list(result) + break else: i += 1 + if stop_flag.value: + break except Exception as e: logging.exception(e) - return [0] + return def _resolve_output_dir( @@ -160,14 +173,14 @@ def save_result( ends_with: Tuple[str, ...] = (), pattern_dirs: Optional[Dict[str, str]] = None, is_case_sensitive: bool = True, + quiet: bool = False, ) -> int: - from core.utils.crypto import get_public_key_from_private_bytes, save_keypair + from core.utils.crypto import get_public_key_from_private_bytes, save_keypair, _seen_pubkeys - result_count = 0 + before_count = len(_seen_pubkeys) for output in outputs: if not output[0]: continue - result_count += 1 pv_bytes = bytes(output[1:]) target_dir = output_dir if pattern_dirs: @@ -176,5 +189,5 @@ def save_result( pubkey, output_dir, starts_with, ends_with, pattern_dirs, is_case_sensitive, ) - save_keypair(pv_bytes, target_dir) - return result_count + save_keypair(pv_bytes, target_dir, quiet=quiet) + return len(_seen_pubkeys) - before_count diff --git a/core/utils/crypto.py b/core/utils/crypto.py index 658c47a..c9ed064 100644 --- a/core/utils/crypto.py +++ b/core/utils/crypto.py @@ -1,10 +1,14 @@ import json import logging from pathlib import Path +from typing import Set from base58 import b58encode from nacl.signing import SigningKey +# In-memory set to track saved keys within this run +_seen_pubkeys: Set[str] = set() + def get_public_key_from_private_bytes(pv_bytes: bytes) -> str: """ @@ -15,15 +19,30 @@ def get_public_key_from_private_bytes(pv_bytes: bytes) -> str: return b58encode(pb_bytes).decode() -def save_keypair(pv_bytes: bytes, output_dir: str) -> str: +def save_keypair(pv_bytes: bytes, output_dir: str, quiet: bool = False) -> str: """ - Save private key to JSON file, return public key + Save private key to JSON file, return public key. + Deduplicates via in-memory set + file existence check. """ + if len(pv_bytes) != 32: + logging.warning(f"Invalid private key length: {len(pv_bytes)} (expected 32)") + return "" + pv = SigningKey(pv_bytes) pb_bytes = bytes(pv.verify_key) pubkey = b58encode(pb_bytes).decode() + + if pubkey in _seen_pubkeys: + return pubkey + _seen_pubkeys.add(pubkey) + Path(output_dir).mkdir(parents=True, exist_ok=True) file_path = Path(output_dir) / f"{pubkey}.json" + + if file_path.exists(): + return pubkey + file_path.write_text(json.dumps(list(pv_bytes + pb_bytes))) - logging.info(f"Found: {pubkey}") + if not quiet: + logging.info(f"Found: {pubkey}") return pubkey