Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 117 additions & 16 deletions core/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import multiprocessing
import sys
import time
import queue as _queue
from multiprocessing.pool import Pool
from typing import Dict, List, Optional, Tuple

Expand Down Expand Up @@ -135,37 +137,136 @@ def search_pubkey(
)
logging.info(f"Using {gpu_counts} OpenCL device(s)")

result_count = 0
FLUSH_INTERVAL = 5.0
PRINT_RATE_THRESHOLD = 5.0 # keys/sec; above this, suppress per-key logs

found_count = 0
saved_total = 0
pending_results: List = []
last_flush = time.time()
last_print_time = time.time()
keys_since_last_print = 0
high_throughput = False

with multiprocessing.Manager() as manager:
with Pool(processes=gpu_counts) as pool:
kernel_source = load_kernel_source(
prefix_patterns, suffix_patterns, is_case_sensitive
)
lock = manager.Lock()
while result_count < count:
stop_flag = manager.Value("i", 0)
results = pool.starmap(
multi_gpu_init,
[
result_queue = manager.Queue()
stop_flag = manager.Value("i", 0)

async_results = []
for x in range(gpu_counts):
async_results.append(
pool.apply_async(
multi_gpu_init,
(
x,
HostSetting(kernel_source, iteration_bits),
gpu_counts,
stop_flag,
lock,
result_queue,
chosen_devices,
)
for x in range(gpu_counts)
],
),
)
)
result_count += save_result(
results,
output_dir,
prefix_patterns,
suffix_patterns,
pattern_dirs,
is_case_sensitive,

while found_count < count:
try:
res = result_queue.get(timeout=1.0)
except _queue.Empty:
res = None
now = time.time()
if isinstance(res, (list, tuple, bytearray, bytes)) and len(res) > 0 and res[0]:
pending_results.append(list(res))
found_count += 1
keys_since_last_print += 1

elapsed_since_print = now - last_print_time
if elapsed_since_print >= 1.0:
rate = keys_since_last_print / elapsed_since_print
high_throughput = rate > PRINT_RATE_THRESHOLD
if high_throughput:
logging.info(
f"Progress: {found_count}/{count} ({rate:.1f} keys/sec)"
)
else:
logging.info(f"Progress: {found_count}/{count}")
last_print_time = now
keys_since_last_print = 0
elif found_count == count:
rate = keys_since_last_print / elapsed_since_print if elapsed_since_print > 0 else 0
logging.info(
f"Progress: {found_count}/{count} ({rate:.1f} keys/sec)"
)

if (now - last_flush) >= FLUSH_INTERVAL or found_count >= count:
if pending_results:
saved = save_result(
pending_results, output_dir,
prefix_patterns, suffix_patterns,
pattern_dirs, is_case_sensitive,
quiet=high_throughput,
)
saved_total += saved
pending_results.clear()
last_flush = now

with lock:
stop_flag.value = 1

logging.info("Signaled workers to stop; draining remaining results...")
while True:
try:
res = result_queue.get(timeout=0.5)
except _queue.Empty:
res = None

now = time.time()
if isinstance(res, (list, tuple, bytearray, bytes)) and len(res) > 0 and res[0]:
pending_results.append(list(res))

if (now - last_flush) >= FLUSH_INTERVAL and pending_results:
saved = save_result(
pending_results, output_dir,
prefix_patterns, suffix_patterns,
pattern_dirs, is_case_sensitive,
)
saved_total += saved
pending_results.clear()
last_flush = now

all_finished = all(a.ready() for a in async_results)
if all_finished:
while True:
try:
res = result_queue.get_nowait()
except _queue.Empty:
break
if isinstance(res, (list, tuple, bytearray, bytes)) and len(res) > 0 and res[0]:
pending_results.append(list(res))
break

if pending_results:
saved = save_result(
pending_results, output_dir,
prefix_patterns, suffix_patterns,
pattern_dirs, is_case_sensitive,
quiet=high_throughput,
)
saved_total += saved
pending_results.clear()

for a in async_results:
try:
a.get(timeout=10)
except Exception:
pass

logging.info(f"Search finished. Total matches found: {found_count}, total saved: {saved_total}")


@cli.command(context_settings={"show_default": True})
Expand Down
43 changes: 28 additions & 15 deletions core/searcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def find(self, log_stats: bool = True) -> bytearray:
cl.enqueue_copy(self.command_queue, self.memobj_key32, self.setting.key32)
global_work_size = self.setting.global_work_size // self.gpu_chunks
local_size = self.setting.local_work_size
global_size = ((global_work_size + local_size - 1) // local_size) * local_size # align global size and local size
global_size = ((global_work_size + local_size - 1) // local_size) * local_size
cl.enqueue_nd_range_kernel(
self.command_queue,
self.kernel,
Expand All @@ -76,14 +76,20 @@ def find(self, log_stats: bool = True) -> bytearray:
)
self.command_queue.flush()
self.setting.increase_key32()
if self.prev_time is not None and self.is_nvidia:
time.sleep(self.prev_time * 0.98)
cl.enqueue_copy(self.command_queue, self.output, self.memobj_output).wait()
self.prev_time = time.time() - start_time
if log_stats:
logging.info(
f"GPU {self.display_index} Speed: {global_work_size / ((time.time() - start_time) * 1e6):.2f} MH/s"
)

# If a match was found, clear the GPU output buffer so we don't report it again
if self.output[0]:
result = bytearray(self.output)
self.output[:] = bytearray(33)
cl.enqueue_copy(self.command_queue, self.memobj_output, self.output).wait()
return result

return self.output


Expand All @@ -93,8 +99,13 @@ def multi_gpu_init(
gpu_counts: int,
stop_flag,
lock,
result_queue,
chosen_devices: Optional[Tuple[int, List[int]]] = None,
) -> List:
) -> None:
"""
Long-running worker for a single GPU. Pushes matches into result_queue
and exits when stop_flag.value is set.
"""
try:
searcher = Searcher(
kernel_source=setting.kernel_source,
Expand All @@ -107,21 +118,23 @@ def multi_gpu_init(
while True:
result = searcher.find(i == 0)
if result[0]:
with lock:
if not stop_flag.value:
stop_flag.value = 1
return list(result)
try:
result_queue.put(list(result))
except Exception:
logging.exception("Failed to put result into queue")
if time.time() - st > max(gpu_counts, 1):
i = 0
st = time.time()
with lock:
if stop_flag.value:
return list(result)
break
else:
i += 1
if stop_flag.value:
break
except Exception as e:
logging.exception(e)
return [0]
return


def _resolve_output_dir(
Expand Down Expand Up @@ -160,14 +173,14 @@ def save_result(
ends_with: Tuple[str, ...] = (),
pattern_dirs: Optional[Dict[str, str]] = None,
is_case_sensitive: bool = True,
quiet: bool = False,
) -> int:
from core.utils.crypto import get_public_key_from_private_bytes, save_keypair
from core.utils.crypto import get_public_key_from_private_bytes, save_keypair, _seen_pubkeys

result_count = 0
before_count = len(_seen_pubkeys)
for output in outputs:
if not output[0]:
continue
result_count += 1
pv_bytes = bytes(output[1:])
target_dir = output_dir
if pattern_dirs:
Expand All @@ -176,5 +189,5 @@ def save_result(
pubkey, output_dir, starts_with, ends_with,
pattern_dirs, is_case_sensitive,
)
save_keypair(pv_bytes, target_dir)
return result_count
save_keypair(pv_bytes, target_dir, quiet=quiet)
return len(_seen_pubkeys) - before_count
25 changes: 22 additions & 3 deletions core/utils/crypto.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import json
import logging
from pathlib import Path
from typing import Set

from base58 import b58encode
from nacl.signing import SigningKey

# In-memory set to track saved keys within this run
_seen_pubkeys: Set[str] = set()


def get_public_key_from_private_bytes(pv_bytes: bytes) -> str:
"""
Expand All @@ -15,15 +19,30 @@ def get_public_key_from_private_bytes(pv_bytes: bytes) -> str:
return b58encode(pb_bytes).decode()


def save_keypair(pv_bytes: bytes, output_dir: str) -> str:
def save_keypair(pv_bytes: bytes, output_dir: str, quiet: bool = False) -> str:
"""
Save private key to JSON file, return public key
Save private key to JSON file, return public key.
Deduplicates via in-memory set + file existence check.
"""
if len(pv_bytes) != 32:
logging.warning(f"Invalid private key length: {len(pv_bytes)} (expected 32)")
return ""

pv = SigningKey(pv_bytes)
pb_bytes = bytes(pv.verify_key)
pubkey = b58encode(pb_bytes).decode()

if pubkey in _seen_pubkeys:
return pubkey
_seen_pubkeys.add(pubkey)

Path(output_dir).mkdir(parents=True, exist_ok=True)
file_path = Path(output_dir) / f"{pubkey}.json"

if file_path.exists():
return pubkey

file_path.write_text(json.dumps(list(pv_bytes + pb_bytes)))
logging.info(f"Found: {pubkey}")
if not quiet:
logging.info(f"Found: {pubkey}")
return pubkey