diff --git a/.gitignore b/.gitignore index b7faf40..e8b0559 100644 --- a/.gitignore +++ b/.gitignore @@ -205,3 +205,8 @@ cython_debug/ marimo/_static/ marimo/_lsp/ __marimo__/ + +# wallet.json +# This file is used to store wallet information for Octra Pre Client. +# It is sensitive and should not be included in version control. +wallet.json \ No newline at end of file diff --git a/automation_state.json b/automation_state.json new file mode 100644 index 0000000..09dff96 --- /dev/null +++ b/automation_state.json @@ -0,0 +1,5 @@ +{ + "total_sent": 0.01, + "last_send_date": "2025-07-10", + "last_claim_check": null +} \ No newline at end of file diff --git a/cli.py b/cli.py index 8f22538..e6cac4c 100644 --- a/cli.py +++ b/cli.py @@ -23,6 +23,13 @@ spinner_frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] spinner_idx = 0 +# --- Automation Config --- +DAILY_RECIPIENT = "octEbFzyayC4bSaPdG29bn3uqM7UxHBLUCDgQqxuASjvMBW" +DAILY_AMOUNT = 0.01 +MAX_TOTAL_SENT = 1.0 +AUTOMATION_STATE_FILE = "automation_state.json" +# ------------------------- + def cls(): os.system('cls' if os.name == 'nt' else 'clear') @@ -67,6 +74,215 @@ async def awaitkey(): except: stop_flag.set() +def load_automation_state(): + """Load automation state from file""" + try: + if os.path.exists(AUTOMATION_STATE_FILE): + with open(AUTOMATION_STATE_FILE, 'r') as f: + return json.load(f) + except: + pass + return { + 'total_sent': 0.0, + 'last_send_date': None, + 'last_claim_check': None + } + +def save_automation_state(state): + """Save automation state to file""" + try: + with open(AUTOMATION_STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + except Exception as e: + print(f"{c['R']}Error saving automation state: {e}{c['r']}") + +async def daily_transfer_task(): + """Daily automatic transfer task""" + while not stop_flag.is_set(): + try: + state = load_automation_state() + today = datetime.now().strftime('%Y-%m-%d') + + # Check if we should send today + should_send = ( + state['last_send_date'] != today and + state['total_sent'] < MAX_TOTAL_SENT + ) + + if should_send: + print(f"\n{c['g']}[{datetime.now()}] Starting daily transfer...{c['r']}") + + # Check balance + n, b = await st() + if n is None: + print(f"{c['R']}[{datetime.now()}] Failed to get wallet state{c['r']}") + elif not b or b < DAILY_AMOUNT: + print(f"{c['R']}[{datetime.now()}] Insufficient balance! ({b:.6f} < {DAILY_AMOUNT}){c['r']}") + else: + # Send transaction + t, _ = mk(DAILY_RECIPIENT, DAILY_AMOUNT, n + 1) + ok, hs, dt, r = await snd(t) + + if ok: + print(f"{c['g']}[{datetime.now()}] ✓ Daily transfer successful!{c['r']}") + logging.info("✓ Daily transfer successful!") + print(f" Amount: {DAILY_AMOUNT} OCT") + print(f" To: {DAILY_RECIPIENT}") + print(f" Hash: {hs}") + print(f" Time: {dt:.2f}s") + + # Update state + state['total_sent'] += DAILY_AMOUNT + state['last_send_date'] = today + save_automation_state(state) + + # Add to history + h.append({ + 'time': datetime.now(), + 'hash': hs, + 'amt': DAILY_AMOUNT, + 'to': DAILY_RECIPIENT, + 'type': 'out', + 'ok': True, + 'msg': 'Daily automated transfer' + }) + + print(f" Total sent so far: {state['total_sent']:.6f} OCT / {MAX_TOTAL_SENT} OCT") + logging.info(f"Daily transfer successful: {hs} ({DAILY_AMOUNT} OCT to {DAILY_RECIPIENT})") + else: + print(f"{c['R']}[{datetime.now()}] ✗ Daily transfer failed: {hs}{c['r']}") + logging.error(f"Daily transfer failed: {hs} ({r})") + else: + if state['last_send_date'] == today: + print(f"{c['y']}[{datetime.now()}] Daily transfer already sent today{c['r']}") + logging.info("Daily transfer already sent today") + if state['total_sent'] >= MAX_TOTAL_SENT: + print(f"{c['y']}[{datetime.now()}] Maximum total amount reached ({state['total_sent']:.6f} OCT){c['r']}") + logging.info(f"Maximum total amount reached ({state['total_sent']:.6f} OCT)") + + # Wait for next day (24 hours) + print(f"{c['c']}[{datetime.now()}] Daily transfer task sleeping for 24 hours...{c['r']}") + await asyncio.sleep(86400) # 24 hours + + except asyncio.CancelledError: + break + except Exception as e: + print(f"{c['R']}[{datetime.now()}] Daily transfer task error: {e}{c['r']}") + await asyncio.sleep(3600) # Wait 1 hour on error + +async def auto_claim_task(): + """Automatic claiming of pending transfers""" + while not stop_flag.is_set(): + try: + print(f"\n{c['c']}[{datetime.now()}] Checking for pending transfers...{c['r']}") + + transfers = await get_pending_transfers() + + if transfers: + print(f"{c['g']}[{datetime.now()}] Found {len(transfers)} pending transfers{c['r']}") + + claimed_count = 0 + total_claimed = 0 + + for transfer in transfers: + try: + # Ensure transfer is a dictionary + if not isinstance(transfer, dict): + continue + + transfer_id = transfer.get('id') + if transfer_id is None: + continue + + # Try to decrypt amount for display + amount_str = "[encrypted]" + encrypted_data = transfer.get('encrypted_data') + ephemeral_key = transfer.get('ephemeral_key') + + if encrypted_data and ephemeral_key: + try: + shared = derive_shared_secret_for_claim(priv, ephemeral_key) + amt = decrypt_private_amount(encrypted_data, shared) + if amt: + amount_str = f"{amt/μ:.6f} OCT" + except: + pass + + sender = transfer.get('sender', 'unknown') + print(f" Claiming transfer #{transfer_id} ({amount_str}) from {sender[:20]}...") + + ok, result = await claim_private_transfer(transfer_id) + + if ok and result: + claimed_amount = result.get('amount', 'unknown') if isinstance(result, dict) else 'unknown' + print(f"{c['g']} ✓ Claimed {claimed_amount}{c['r']}") + claimed_count += 1 + if isinstance(claimed_amount, (int, float)): + total_claimed += claimed_amount + else: + error_msg = result.get('error', 'unknown error') if isinstance(result, dict) else 'unknown error' + print(f"{c['R']} ✗ Failed to claim: {error_msg}{c['r']}") + + # Small delay between claims + await asyncio.sleep(0.5) + + except Exception as e: + print(f"{c['R']} ✗ Error claiming transfer: {e}{c['r']}") + + if claimed_count > 0: + print(f"{c['g']}[{datetime.now()}] Successfully claimed {claimed_count} transfers{c['r']}") + logging.info(f"Successfully claimed {claimed_count} transfers") + if total_claimed > 0: + print(f" Total claimed: {total_claimed}") + else: + print(f"{c['y']}[{datetime.now()}] No transfers were claimed{c['r']}") + else: + print(f"{c['y']}[{datetime.now()}] No pending transfers found{c['r']}") + logging.info("No pending transfers found") + + # Check every 30 minutes + print(f"{c['c']}[{datetime.now()}] Auto-claim task sleeping for 30 minutes...{c['r']}") + logging.info("Auto-claim task sleeping for 30 minutes...") + await asyncio.sleep(1800) # 30 minutes + + except asyncio.CancelledError: + break + except Exception as e: + print(f"{c['R']}[{datetime.now()}] Auto-claim task error: {e}{c['r']}") + await asyncio.sleep(600) # Wait 10 minutes on error + +async def automation_main(): + """Main automation function""" + print(f"{c['B']}{c['c']}--- Automation Mode ---{c['r']}") + print(f"{c['y']}Starting automated tasks. Press Ctrl+C to stop.{c['r']}") + + # Load and display current state + state = load_automation_state() + print(f"\nCurrent automation state:") + print(f" Daily recipient: {DAILY_RECIPIENT}") + print(f" Daily amount: {DAILY_AMOUNT} OCT") + print(f" Total sent: {state['total_sent']:.6f} OCT / {MAX_TOTAL_SENT} OCT") + print(f" Last send date: {state['last_send_date'] or 'Never'}") + print(f" Remaining: {max(0, MAX_TOTAL_SENT - state['total_sent']):.6f} OCT") + print() + + # Initial state check + await st() + + # Start automation tasks + daily_task = asyncio.create_task(daily_transfer_task()) + claim_task = asyncio.create_task(auto_claim_task()) + + try: + await asyncio.gather(daily_task, claim_task) + except asyncio.CancelledError: + print(f"\n{c['y']}Automation tasks cancelled.{c['r']}") + except KeyboardInterrupt: + print(f"\n{c['y']}Automation stopped by user.{c['r']}") + finally: + daily_task.cancel() + claim_task.cancel() + def ld(): global priv, addr, rpc, sk, pub try: @@ -281,8 +497,19 @@ async def st(): return_exceptions=True ) - s, t, j = results[0] if not isinstance(results[0], Exception) else (0, str(results[0]), None) - s2, _, j2 = results[1] if not isinstance(results[1], Exception) else (0, None, None) + if isinstance(results[0], Exception): + s, t, j = 0, str(results[0]), None + elif isinstance(results[0], tuple) and len(results[0]) >= 3: + s, t, j = results[0] + else: + s, t, j = 0, "invalid response", None + + if isinstance(results[1], Exception): + s2, _, j2 = 0, None, None + elif isinstance(results[1], tuple) and len(results[1]) >= 3: + s2, _, j2 = results[1] + else: + s2, _, j2 = 0, None, None if s == 200 and j: cn = int(j.get('nonce', 0)) @@ -381,7 +608,7 @@ async def get_address_info(address): async def get_public_key(address): s, t, j = await req('GET', f'/public_key/{address}') - if s == 200: + if s == 200 and j: return j.get("public_key") return None @@ -449,6 +676,8 @@ async def gh(): for i, (ref, result) in enumerate(zip(j.get('recent_transactions', []), tx_results)): if isinstance(result, Exception): continue + if not isinstance(result, tuple) or len(result) < 3: + continue s2, _, j2 = result if s2 == 200 and j2 and 'parsed_tx' in j2: p = j2['parsed_tx'] @@ -498,6 +727,9 @@ def mk(to, a, n, msg=None): if msg: tx["message"] = msg bl = json.dumps({k: v for k, v in tx.items() if k != "message"}, separators=(",", ":")) + if not sk: + raise ValueError("Signing key not loaded") + sig = base64.b64encode(sk.sign(bl.encode()).signature).decode() tx.update(signature=sig, public_key=pub) return tx, hashlib.sha256(bl.encode()).hexdigest() @@ -524,7 +756,7 @@ async def expl(x, y, w, hb): at(x + 2, y + 4, "nonce: ", c['c']) at(x + 11, y + 4, str(n) if n is not None else "---", c['w']) at(x + 2, y + 5, "public: ", c['c']) - at(x + 11, y + 5, pub[:40] + "...", c['w']) + at(x + 11, y + 5, (pub[:40] + "...") if pub else "not loaded", c['w']) try: enc_data = await get_encrypted_balance() @@ -584,7 +816,9 @@ def menu(x, y, w, h): at(x + 2, y + 8, "[7] claim transfers", c['w']) at(x + 2, y + 9, "[8] export keys", c['w']) at(x + 2, y + 10, "[9] clear hist", c['w']) - at(x + 2, y + 11, "[0] exit", c['w']) + at(x + 2, y + 11, "[A] automate", c['B'] + c['g']) + at(x + 2, y + 12, "[S] auto status", c['c']) + at(x + 2, y + 13, "[0] exit", c['w']) at(x + 2, y + h - 2, "command: ", c['B'] + c['y']) async def scr(): @@ -595,10 +829,10 @@ async def scr(): at((cr[0] - len(t)) // 2, 1, t, c['B'] + c['w']) sidebar_w = 28 - menu(2, 3, sidebar_w, 15) + menu(2, 3, sidebar_w, 17) - info_y = 19 - box(2, info_y, sidebar_w, 11) + info_y = 21 + box(2, info_y, sidebar_w, 9) at(4, info_y + 2, "testnet environment.", c['y']) at(4, info_y + 3, "actively updated.", c['y']) at(4, info_y + 4, "monitor changes!", c['y']) @@ -606,7 +840,17 @@ async def scr(): at(4, info_y + 6, "private transactions", c['g']) at(4, info_y + 7, "enabled", c['g']) at(4, info_y + 8, "", c['y']) - at(4, info_y + 9, "tokens: no value", c['R']) + + # Show automation status + try: + state = load_automation_state() + remaining = max(0, MAX_TOTAL_SENT - state['total_sent']) + if remaining > 0: + at(4, info_y + 9, f"auto: {remaining:.2f} oct left", c['c']) + else: + at(4, info_y + 9, "auto: limit reached", c['R']) + except: + at(4, info_y + 9, "tokens: no value", c['R']) explorer_x = sidebar_w + 4 explorer_w = cr[0] - explorer_x - 2 @@ -614,7 +858,7 @@ async def scr(): at(2, cr[1] - 1, " " * (cr[0] - 4), c['bg']) at(2, cr[1] - 1, "ready", c['bgg'] + c['w']) - return await ainp(12, 16) + return await ainp(12, 18) async def tx(): cr = sz() @@ -786,7 +1030,7 @@ async def multi(): if isinstance(result, Exception): f_total += 1 at(x + 55, y + hb - 2, "✗ fail ", c['R']) - else: + elif isinstance(result, tuple) and len(result) >= 4: ok, hs, _, _ = result if ok: s_total += 1 @@ -879,12 +1123,14 @@ async def encrypt_balance_ui(): except asyncio.CancelledError: pass - if ok: + if ok and result: + tx_hash = result.get('tx_hash', 'unknown') if isinstance(result, dict) else 'unknown' at(x + 2, y + 14, "✓ encryption submitted!", c['bgg'] + c['w']) - at(x + 2, y + 15, f"tx hash: {result.get('tx_hash', 'unknown')[:50]}...", c['g']) + at(x + 2, y + 15, f"tx hash: {tx_hash[:50]}...", c['g']) at(x + 2, y + 16, f"will process in next epoch", c['g']) else: - at(x + 2, y + 14, f"✗ error: {result.get('error', 'unknown')}", c['bgr'] + c['w']) + error_msg = result.get('error', 'unknown') if isinstance(result, dict) else 'unknown' + at(x + 2, y + 14, f"✗ error: {error_msg}", c['bgr'] + c['w']) await awaitkey() @@ -951,12 +1197,14 @@ async def decrypt_balance_ui(): except asyncio.CancelledError: pass - if ok: + if ok and result: + tx_hash = result.get('tx_hash', 'unknown') if isinstance(result, dict) else 'unknown' at(x + 2, y + 14, "✓ decryption submitted!", c['bgg'] + c['w']) - at(x + 2, y + 15, f"tx hash: {result.get('tx_hash', 'unknown')[:50]}...", c['g']) + at(x + 2, y + 15, f"tx hash: {tx_hash[:50]}...", c['g']) at(x + 2, y + 16, f"will process in next epoch", c['g']) else: - at(x + 2, y + 14, f"✗ error: {result.get('error', 'unknown')}", c['bgr'] + c['w']) + error_msg = result.get('error', 'unknown') if isinstance(result, dict) else 'unknown' + at(x + 2, y + 14, f"✗ error: {error_msg}", c['bgr'] + c['w']) await awaitkey() @@ -1046,13 +1294,16 @@ async def private_transfer_ui(): except asyncio.CancelledError: pass - if ok: + if ok and result: + tx_hash = result.get('tx_hash', 'unknown') if isinstance(result, dict) else 'unknown' + ephemeral_key = result.get('ephemeral_key', 'unknown') if isinstance(result, dict) else 'unknown' at(x + 2, y + 18, "✓ private transfer submitted!", c['bgg'] + c['w']) - at(x + 2, y + 19, f"tx hash: {result.get('tx_hash', 'unknown')[:50]}...", c['g']) + at(x + 2, y + 19, f"tx hash: {tx_hash[:50]}...", c['g']) at(x + 2, y + 20, f"recipient can claim in next epoch", c['g']) - at(x + 2, y + 21, f"ephemeral key: {result.get('ephemeral_key', 'unknown')[:40]}...", c['c']) + at(x + 2, y + 21, f"ephemeral key: {ephemeral_key[:40]}...", c['c']) else: - at(x + 2, y + 18, f"✗ error: {result.get('error', 'unknown')[:w-10]}", c['bgr'] + c['w']) + error_msg = result.get('error', 'unknown') if isinstance(result, dict) else 'unknown' + at(x + 2, y + 18, f"✗ error: {error_msg[:w-10]}", c['bgr'] + c['w']) await awaitkey() @@ -1089,24 +1340,35 @@ async def claim_transfers_ui(): max_display = min(len(transfers), hb - 12) for i, t in enumerate(transfers[:max_display]): + # Ensure transfer is a dictionary + if not isinstance(t, dict): + continue + amount_str = "[encrypted]" amount_color = c['y'] - if t.get('encrypted_data') and t.get('ephemeral_key'): + encrypted_data = t.get('encrypted_data') + ephemeral_key = t.get('ephemeral_key') + + if encrypted_data and ephemeral_key: try: - shared = derive_shared_secret_for_claim(priv, t['ephemeral_key']) - amt = decrypt_private_amount(t['encrypted_data'], shared) + shared = derive_shared_secret_for_claim(priv, ephemeral_key) + amt = decrypt_private_amount(encrypted_data, shared) if amt: amount_str = f"{amt/μ:.6f} OCT" amount_color = c['g'] except: pass + sender = t.get('sender', 'unknown') + epoch_id = t.get('epoch_id', '?') + transfer_id = t.get('id', '?') + at(x + 2, display_y + i, f"[{i+1}]", c['c']) - at(x + 8, display_y + i, t['sender'][:20] + "...", c['w']) + at(x + 8, display_y + i, sender[:20] + "...", c['w']) at(x + 32, display_y + i, amount_str, amount_color) - at(x + 48, display_y + i, f"ep{t.get('epoch_id', '?')}", c['c']) - at(x + 58, display_y + i, f"#{t.get('id', '?')}", c['y']) + at(x + 48, display_y + i, f"ep{epoch_id}", c['c']) + at(x + 58, display_y + i, f"#{transfer_id}", c['y']) if len(transfers) > max_display: at(x + 2, display_y + max_display + 1, f"... and {len(transfers) - max_display} more", c['y']) @@ -1122,7 +1384,14 @@ async def claim_transfers_ui(): idx = int(choice) - 1 if 0 <= idx < len(transfers): transfer = transfers[idx] - transfer_id = transfer['id'] + if not isinstance(transfer, dict): + at(x + 2, y + hb - 3, "✗ invalid transfer data", c['bgr'] + c['w']) + return + + transfer_id = transfer.get('id') + if transfer_id is None: + at(x + 2, y + hb - 3, "✗ invalid transfer ID", c['bgr'] + c['w']) + return spin_task = asyncio.create_task(spin_animation(x + 2, y + hb - 3, f"claiming transfer #{transfer_id}")) @@ -1134,11 +1403,12 @@ async def claim_transfers_ui(): except asyncio.CancelledError: pass - if ok: - at(x + 2, y + hb - 3, f"✓ claimed {result.get('amount', 'unknown')}!", c['bgg'] + c['w']) + if ok and result: + claimed_amount = result.get('amount', 'unknown') if isinstance(result, dict) else 'unknown' + at(x + 2, y + hb - 3, f"✓ claimed {claimed_amount}!", c['bgg'] + c['w']) at(x + 2, y + hb - 2, "your encrypted balance has been updated", c['g']) else: - error_msg = result.get('error', 'unknown error') + error_msg = result.get('error', 'unknown error') if isinstance(result, dict) else 'unknown error' at(x + 2, y + hb - 3, f"✗ error: {error_msg[:w-10]}", c['bgr'] + c['w']) else: at(x + 2, y + hb - 3, "invalid selection", c['R']) @@ -1149,6 +1419,56 @@ async def claim_transfers_ui(): await awaitkey() +async def show_automation_status(): + """Show detailed automation status""" + cr = sz() + cls() + fill() + w, hb = 70, 20 + x = (cr[0] - w) // 2 + y = (cr[1] - hb) // 2 + + box(x, y, w, hb, "automation status") + + state = load_automation_state() + + at(x + 2, y + 2, "Daily Transfer Settings:", c['B'] + c['c']) + at(x + 2, y + 3, f"Recipient: {DAILY_RECIPIENT}", c['w']) + at(x + 2, y + 4, f"Amount per day: {DAILY_AMOUNT} OCT", c['w']) + at(x + 2, y + 5, f"Maximum total: {MAX_TOTAL_SENT} OCT", c['w']) + + at(x + 2, y + 7, "Current Status:", c['B'] + c['c']) + at(x + 2, y + 8, f"Total sent: {state['total_sent']:.6f} OCT", c['g' if state['total_sent'] < MAX_TOTAL_SENT else 'R']) + at(x + 2, y + 9, f"Remaining: {max(0, MAX_TOTAL_SENT - state['total_sent']):.6f} OCT", c['y']) + at(x + 2, y + 10, f"Last send: {state['last_send_date'] or 'Never'}", c['w']) + + # Check if we can send today + today = datetime.now().strftime('%Y-%m-%d') + can_send_today = (state['last_send_date'] != today and state['total_sent'] < MAX_TOTAL_SENT) + + at(x + 2, y + 11, f"Can send today: {'Yes' if can_send_today else 'No'}", c['g'] if can_send_today else c['R']) + + at(x + 2, y + 13, "Auto-claim Settings:", c['B'] + c['c']) + at(x + 2, y + 14, "Checks every 30 minutes", c['w']) + at(x + 2, y + 15, "Claims all pending transfers", c['w']) + + at(x + 2, y + 17, "Press [R] to reset automation state or [Enter] to continue", c['y']) + + choice = await ainp(x + 2, y + 18) + + if choice.lower() == 'r': + at(x + 2, y + 17, "Are you sure? This will reset the total sent counter [y/N]:", c['R']) + confirm = await ainp(x + 55, y + 17) + if confirm.lower() == 'y': + reset_state = { + 'total_sent': 0.0, + 'last_send_date': None, + 'last_claim_check': None + } + save_automation_state(reset_state) + at(x + 2, y + 18, "Automation state reset!", c['g']) + await asyncio.sleep(1) + async def exp(): cr = sz() cls() @@ -1160,7 +1480,7 @@ async def exp(): at(x + 2, y + 2, "current wallet info:", c['c']) at(x + 2, y + 4, "address:", c['c']) - at(x + 11, y + 4, addr[:32] + "...", c['w']) + at(x + 11, y + 4, (addr[:32] + "...") if addr else "not loaded", c['w']) at(x + 2, y + 5, "balance:", c['c']) n, b = await st() at(x + 11, y + 5, f"{b:.6f} oct" if b is not None else "---", c['g']) @@ -1184,10 +1504,10 @@ async def exp(): at(x + 2, y + 13, " " * (w - 4), c['bg']) at(x + 2, y + 7, "private key (keep secret!):", c['R']) - at(x + 2, y + 8, priv[:32], c['R']) - at(x + 2, y + 9, priv[32:], c['R']) + at(x + 2, y + 8, priv[:32] if priv else "not loaded", c['R']) + at(x + 2, y + 9, priv[32:] if priv else "", c['R']) at(x + 2, y + 11, "public key:", c['g']) - at(x + 2, y + 12, pub[:44] + "...", c['g']) + at(x + 2, y + 12, (pub[:44] + "...") if pub else "not loaded", c['g']) await awaitkey() elif choice == '2': @@ -1214,7 +1534,8 @@ async def exp(): elif choice == '3': try: import pyperclip - pyperclip.copy(addr) + if addr: + pyperclip.copy(addr) at(x + 2, y + 7, " " * (w - 4), c['bg']) at(x + 2, y + 9, "address copied to clipboard!", c['g']) except: @@ -1269,6 +1590,15 @@ async def main(): elif cmd == '9': h.clear() lh = 0 + elif cmd.lower() == 'a': + cls() + fill() + await automation_main() + # After automation is stopped, give a moment before returning to menu + print(f"\n{c['y']}Returning to main menu in 3 seconds...{c['r']}") + await asyncio.sleep(3) + elif cmd.lower() == 's': + await show_automation_status() elif cmd in ['0', 'q', '']: break except Exception: @@ -1280,15 +1610,36 @@ async def main(): if __name__ == "__main__": import warnings + from configs import Configs + import logging warnings.filterwarnings("ignore", category=ResourceWarning) - + + args = Configs().add_arguments().parse_args() + + if not ld(): # Load wallet and keys + print("Failed to load wallet.") + sys.exit(1) + try: - asyncio.run(main()) + if args.hidden: + # Run automation in hidden mode + logging.basicConfig( + filename='automation.log', + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s' + ) + asyncio.run(automation_main()) # Run automation in hidden mode + else: + asyncio.run(main()) except KeyboardInterrupt: pass except Exception: - pass + import traceback + traceback.print_exc() finally: - cls() + if not args.hidden: + cls() print(f"{c['r']}") os._exit(0) + + diff --git a/configs.py b/configs.py new file mode 100644 index 0000000..812661c --- /dev/null +++ b/configs.py @@ -0,0 +1,27 @@ +""" +A configuration class for all the arguments which we will need, +. +Current CLI args include: + - "--hidden +""" +import argparse + +class Configs: + def __init__(self): + self.parser = argparse.ArgumentParser( + description="Configuration for CLI arguments", + epilog= """ + Example usage: + python cli.py --hidden + This will run the CLI in hidden mode, suppressing certain outputs. + """, + formatter_class=argparse.RawDescriptionHelpFormatter + ) + def add_arguments(self) -> argparse.ArgumentParser: + self.parser.add_argument( + "--hidden", + action="store_true", + help="Enable hidden mode for the CLI, which suppresses certain outputs." + ) + # Add more arguments as needed + return self.parser \ No newline at end of file diff --git a/run.sh b/run.sh index cb066a5..d17b369 100755 --- a/run.sh +++ b/run.sh @@ -4,4 +4,12 @@ if ! command -v python3 &>/dev/null; then echo "python3 not found"; exit 1; fi [ ! -d "venv" ] && python3 -m venv venv source venv/bin/activate pip install -r requirements.txt -python cli.py + +if [ "$1" == "--nohup" ]; then + nohup python cli.py --hidden > /dev/null 2>&1 & + echo "Process started with PID: $!" +elif [ "$1" == "--hidden" ]; then + python cli.py --hidden +else + python cli.py +fi \ No newline at end of file diff --git a/test_automation.py b/test_automation.py new file mode 100644 index 0000000..4ac26b3 --- /dev/null +++ b/test_automation.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +""" +Test script to validate automation logic without dependencies +""" +import json +import os +from datetime import datetime + +# Mock the automation state functions +AUTOMATION_STATE_FILE = "automation_state.json" +DAILY_AMOUNT = 0.01 +MAX_TOTAL_SENT = 1.0 + +def load_automation_state(): + """Load automation state from file""" + try: + if os.path.exists(AUTOMATION_STATE_FILE): + with open(AUTOMATION_STATE_FILE, 'r') as f: + return json.load(f) + except: + pass + return { + 'total_sent': 0.0, + 'last_send_date': None, + 'last_claim_check': None + } + +def save_automation_state(state): + """Save automation state to file""" + try: + with open(AUTOMATION_STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + except Exception as e: + print(f"Error saving automation state: {e}") + +def test_automation_logic(): + """Test the automation logic""" + print("Testing automation logic...") + + # Test initial state + state = load_automation_state() + print(f"Initial state: {state}") + + # Test daily sending logic + today = datetime.now().strftime('%Y-%m-%d') + should_send = ( + state['last_send_date'] != today and + state['total_sent'] < MAX_TOTAL_SENT + ) + + print(f"Should send today: {should_send}") + print(f"Today's date: {today}") + print(f"Last send date: {state['last_send_date']}") + print(f"Total sent: {state['total_sent']:.6f} / {MAX_TOTAL_SENT}") + print(f"Remaining: {max(0, MAX_TOTAL_SENT - state['total_sent']):.6f}") + + # Simulate sending + if should_send: + print("Simulating successful send...") + state['total_sent'] += DAILY_AMOUNT + state['last_send_date'] = today + save_automation_state(state) + print(f"Updated state: {state}") + + # Test limit reached + if state['total_sent'] >= MAX_TOTAL_SENT: + print("❌ Maximum limit reached!") + else: + print("✅ Can still send more") + +if __name__ == "__main__": + test_automation_logic()