From 8690c03aea0ad71321bd379a162624d1165ec508 Mon Sep 17 00:00:00 2001 From: sullystuff Date: Mon, 16 Mar 2026 08:15:51 -0600 Subject: [PATCH 1/2] feat: reuse compiled OpenCL program per worker, queue-based result collection Instead of restarting workers (and recompiling the OpenCL kernel) after every match, workers now run continuously and push results into a shared multiprocessing.Queue. The main process consumes the queue and flushes to disk in batches. Key changes: - multi_gpu_init is now a long-running loop that pushes to result_queue - GPU output buffer is cleared after each match so results aren't reported twice - Remove NVIDIA sleep throttle (prev_time * 0.98) that artificially slowed iteration on short-prefix searches - Deduplicate saved keypairs via in-memory set + file existence check - Validate private key length before attempting to derive pubkey - Flush results to disk at most once per 5 seconds to reduce I/O Co-Authored-By: Claude Opus 4.6 (1M context) --- core/cli.py | 110 ++++++++++++++++++++++++++++++++++++------- core/searcher.py | 40 ++++++++++------ core/utils/crypto.py | 20 +++++++- 3 files changed, 139 insertions(+), 31 deletions(-) diff --git a/core/cli.py b/core/cli.py index e1c8fce..fc6a345 100644 --- a/core/cli.py +++ b/core/cli.py @@ -1,6 +1,8 @@ import logging import multiprocessing import sys +import time +import queue as _queue from multiprocessing.pool import Pool from typing import Dict, List, Optional, Tuple @@ -135,37 +137,113 @@ def search_pubkey( ) logging.info(f"Using {gpu_counts} OpenCL device(s)") - result_count = 0 + FLUSH_INTERVAL = 5.0 + + found_count = 0 + saved_total = 0 + pending_results: List = [] + last_flush = time.time() + with multiprocessing.Manager() as manager: with Pool(processes=gpu_counts) as pool: kernel_source = load_kernel_source( prefix_patterns, suffix_patterns, is_case_sensitive ) lock = manager.Lock() - while result_count < count: - stop_flag = manager.Value("i", 0) - results = pool.starmap( - multi_gpu_init, - [ + result_queue = manager.Queue() + stop_flag = manager.Value("i", 0) + + async_results = [] + for x in range(gpu_counts): + async_results.append( + pool.apply_async( + multi_gpu_init, ( x, HostSetting(kernel_source, iteration_bits), gpu_counts, stop_flag, lock, + result_queue, chosen_devices, - ) - for x in range(gpu_counts) - ], + ), + ) ) - result_count += save_result( - results, - output_dir, - prefix_patterns, - suffix_patterns, - pattern_dirs, - is_case_sensitive, + + while found_count < count: + try: + res = result_queue.get(timeout=1.0) + except _queue.Empty: + res = None + now = time.time() + if isinstance(res, (list, tuple, bytearray, bytes)) and len(res) > 0 and res[0]: + pending_results.append(list(res)) + found_count += 1 + if found_count % 100 == 0 or found_count == count: + logging.info(f"Progress: {found_count}/{count} matches") + + if (now - last_flush) >= FLUSH_INTERVAL or found_count >= count: + if pending_results: + saved = save_result( + pending_results, output_dir, + prefix_patterns, suffix_patterns, + pattern_dirs, is_case_sensitive, + ) + saved_total += saved + pending_results.clear() + last_flush = now + + with lock: + stop_flag.value = 1 + + logging.info("Signaled workers to stop; draining remaining results...") + while True: + try: + res = result_queue.get(timeout=0.5) + except _queue.Empty: + res = None + + now = time.time() + if isinstance(res, (list, tuple, bytearray, bytes)) and len(res) > 0 and res[0]: + pending_results.append(list(res)) + + if (now - last_flush) >= FLUSH_INTERVAL and pending_results: + saved = save_result( + pending_results, output_dir, + prefix_patterns, suffix_patterns, + pattern_dirs, is_case_sensitive, + ) + saved_total += saved + pending_results.clear() + last_flush = now + + all_finished = all(a.ready() for a in async_results) + if all_finished: + while True: + try: + res = result_queue.get_nowait() + except _queue.Empty: + break + if isinstance(res, (list, tuple, bytearray, bytes)) and len(res) > 0 and res[0]: + pending_results.append(list(res)) + break + + if pending_results: + saved = save_result( + pending_results, output_dir, + prefix_patterns, suffix_patterns, + pattern_dirs, is_case_sensitive, ) + saved_total += saved + pending_results.clear() + + for a in async_results: + try: + a.get(timeout=10) + except Exception: + pass + + logging.info(f"Search finished. Total matches found: {found_count}, total saved: {saved_total}") @cli.command(context_settings={"show_default": True}) diff --git a/core/searcher.py b/core/searcher.py index 9e06dbf..e906b83 100644 --- a/core/searcher.py +++ b/core/searcher.py @@ -67,7 +67,7 @@ def find(self, log_stats: bool = True) -> bytearray: cl.enqueue_copy(self.command_queue, self.memobj_key32, self.setting.key32) global_work_size = self.setting.global_work_size // self.gpu_chunks local_size = self.setting.local_work_size - global_size = ((global_work_size + local_size - 1) // local_size) * local_size # align global size and local size + global_size = ((global_work_size + local_size - 1) // local_size) * local_size cl.enqueue_nd_range_kernel( self.command_queue, self.kernel, @@ -76,14 +76,20 @@ def find(self, log_stats: bool = True) -> bytearray: ) self.command_queue.flush() self.setting.increase_key32() - if self.prev_time is not None and self.is_nvidia: - time.sleep(self.prev_time * 0.98) cl.enqueue_copy(self.command_queue, self.output, self.memobj_output).wait() self.prev_time = time.time() - start_time if log_stats: logging.info( f"GPU {self.display_index} Speed: {global_work_size / ((time.time() - start_time) * 1e6):.2f} MH/s" ) + + # If a match was found, clear the GPU output buffer so we don't report it again + if self.output[0]: + result = bytearray(self.output) + self.output[:] = bytearray(33) + cl.enqueue_copy(self.command_queue, self.memobj_output, self.output).wait() + return result + return self.output @@ -93,8 +99,13 @@ def multi_gpu_init( gpu_counts: int, stop_flag, lock, + result_queue, chosen_devices: Optional[Tuple[int, List[int]]] = None, -) -> List: +) -> None: + """ + Long-running worker for a single GPU. Pushes matches into result_queue + and exits when stop_flag.value is set. + """ try: searcher = Searcher( kernel_source=setting.kernel_source, @@ -107,21 +118,23 @@ def multi_gpu_init( while True: result = searcher.find(i == 0) if result[0]: - with lock: - if not stop_flag.value: - stop_flag.value = 1 - return list(result) + try: + result_queue.put(list(result)) + except Exception: + logging.exception("Failed to put result into queue") if time.time() - st > max(gpu_counts, 1): i = 0 st = time.time() with lock: if stop_flag.value: - return list(result) + break else: i += 1 + if stop_flag.value: + break except Exception as e: logging.exception(e) - return [0] + return def _resolve_output_dir( @@ -161,13 +174,12 @@ def save_result( pattern_dirs: Optional[Dict[str, str]] = None, is_case_sensitive: bool = True, ) -> int: - from core.utils.crypto import get_public_key_from_private_bytes, save_keypair + from core.utils.crypto import get_public_key_from_private_bytes, save_keypair, _seen_pubkeys - result_count = 0 + before_count = len(_seen_pubkeys) for output in outputs: if not output[0]: continue - result_count += 1 pv_bytes = bytes(output[1:]) target_dir = output_dir if pattern_dirs: @@ -177,4 +189,4 @@ def save_result( pattern_dirs, is_case_sensitive, ) save_keypair(pv_bytes, target_dir) - return result_count + return len(_seen_pubkeys) - before_count diff --git a/core/utils/crypto.py b/core/utils/crypto.py index 658c47a..6614134 100644 --- a/core/utils/crypto.py +++ b/core/utils/crypto.py @@ -1,10 +1,14 @@ import json import logging from pathlib import Path +from typing import Set from base58 import b58encode from nacl.signing import SigningKey +# In-memory set to track saved keys within this run +_seen_pubkeys: Set[str] = set() + def get_public_key_from_private_bytes(pv_bytes: bytes) -> str: """ @@ -17,13 +21,27 @@ def get_public_key_from_private_bytes(pv_bytes: bytes) -> str: def save_keypair(pv_bytes: bytes, output_dir: str) -> str: """ - Save private key to JSON file, return public key + Save private key to JSON file, return public key. + Deduplicates via in-memory set + file existence check. """ + if len(pv_bytes) != 32: + logging.warning(f"Invalid private key length: {len(pv_bytes)} (expected 32)") + return "" + pv = SigningKey(pv_bytes) pb_bytes = bytes(pv.verify_key) pubkey = b58encode(pb_bytes).decode() + + if pubkey in _seen_pubkeys: + return pubkey + _seen_pubkeys.add(pubkey) + Path(output_dir).mkdir(parents=True, exist_ok=True) file_path = Path(output_dir) / f"{pubkey}.json" + + if file_path.exists(): + return pubkey + file_path.write_text(json.dumps(list(pv_bytes + pb_bytes))) logging.info(f"Found: {pubkey}") return pubkey From 460df5de4562f9653158388d0a02b5042dabda80 Mon Sep 17 00:00:00 2001 From: sullystuff Date: Mon, 16 Mar 2026 08:17:14 -0600 Subject: [PATCH 2/2] feat: adaptive console printing for high-throughput generation When generating keys at >5 keys/sec (common with short prefixes), per-key "Found: ..." log lines create excessive output. This adds rate-aware logging: - Track keys/sec over a rolling 1-second window - Above 5 keys/sec: suppress individual "Found:" lines, print a summary with rate once per second instead - Below threshold: keep normal per-key logging - Always print final progress at completion Co-Authored-By: Claude Opus 4.6 (1M context) --- core/cli.py | 27 +++++++++++++++++++++++++-- core/searcher.py | 3 ++- core/utils/crypto.py | 5 +++-- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/core/cli.py b/core/cli.py index fc6a345..39d7882 100644 --- a/core/cli.py +++ b/core/cli.py @@ -138,11 +138,15 @@ def search_pubkey( logging.info(f"Using {gpu_counts} OpenCL device(s)") FLUSH_INTERVAL = 5.0 + PRINT_RATE_THRESHOLD = 5.0 # keys/sec; above this, suppress per-key logs found_count = 0 saved_total = 0 pending_results: List = [] last_flush = time.time() + last_print_time = time.time() + keys_since_last_print = 0 + high_throughput = False with multiprocessing.Manager() as manager: with Pool(processes=gpu_counts) as pool: @@ -179,8 +183,25 @@ def search_pubkey( if isinstance(res, (list, tuple, bytearray, bytes)) and len(res) > 0 and res[0]: pending_results.append(list(res)) found_count += 1 - if found_count % 100 == 0 or found_count == count: - logging.info(f"Progress: {found_count}/{count} matches") + keys_since_last_print += 1 + + elapsed_since_print = now - last_print_time + if elapsed_since_print >= 1.0: + rate = keys_since_last_print / elapsed_since_print + high_throughput = rate > PRINT_RATE_THRESHOLD + if high_throughput: + logging.info( + f"Progress: {found_count}/{count} ({rate:.1f} keys/sec)" + ) + else: + logging.info(f"Progress: {found_count}/{count}") + last_print_time = now + keys_since_last_print = 0 + elif found_count == count: + rate = keys_since_last_print / elapsed_since_print if elapsed_since_print > 0 else 0 + logging.info( + f"Progress: {found_count}/{count} ({rate:.1f} keys/sec)" + ) if (now - last_flush) >= FLUSH_INTERVAL or found_count >= count: if pending_results: @@ -188,6 +209,7 @@ def search_pubkey( pending_results, output_dir, prefix_patterns, suffix_patterns, pattern_dirs, is_case_sensitive, + quiet=high_throughput, ) saved_total += saved pending_results.clear() @@ -233,6 +255,7 @@ def search_pubkey( pending_results, output_dir, prefix_patterns, suffix_patterns, pattern_dirs, is_case_sensitive, + quiet=high_throughput, ) saved_total += saved pending_results.clear() diff --git a/core/searcher.py b/core/searcher.py index e906b83..ffd08c1 100644 --- a/core/searcher.py +++ b/core/searcher.py @@ -173,6 +173,7 @@ def save_result( ends_with: Tuple[str, ...] = (), pattern_dirs: Optional[Dict[str, str]] = None, is_case_sensitive: bool = True, + quiet: bool = False, ) -> int: from core.utils.crypto import get_public_key_from_private_bytes, save_keypair, _seen_pubkeys @@ -188,5 +189,5 @@ def save_result( pubkey, output_dir, starts_with, ends_with, pattern_dirs, is_case_sensitive, ) - save_keypair(pv_bytes, target_dir) + save_keypair(pv_bytes, target_dir, quiet=quiet) return len(_seen_pubkeys) - before_count diff --git a/core/utils/crypto.py b/core/utils/crypto.py index 6614134..c9ed064 100644 --- a/core/utils/crypto.py +++ b/core/utils/crypto.py @@ -19,7 +19,7 @@ def get_public_key_from_private_bytes(pv_bytes: bytes) -> str: return b58encode(pb_bytes).decode() -def save_keypair(pv_bytes: bytes, output_dir: str) -> str: +def save_keypair(pv_bytes: bytes, output_dir: str, quiet: bool = False) -> str: """ Save private key to JSON file, return public key. Deduplicates via in-memory set + file existence check. @@ -43,5 +43,6 @@ def save_keypair(pv_bytes: bytes, output_dir: str) -> str: return pubkey file_path.write_text(json.dumps(list(pv_bytes + pb_bytes))) - logging.info(f"Found: {pubkey}") + if not quiet: + logging.info(f"Found: {pubkey}") return pubkey