diff --git a/python/kaspa/__init__.pyi b/python/kaspa/__init__.pyi index b5a65a22..2b854c37 100644 --- a/python/kaspa/__init__.pyi +++ b/python/kaspa/__init__.pyi @@ -127,6 +127,13 @@ class AccountKind: Returns: str: The account kind as a string. """ + def __repr__(self) -> builtins.str: + r""" + The detailed string representation. + + Returns: + str: The account kind as a repr string. + """ def to_string(self) -> builtins.str: r""" Get the string representation. diff --git a/src/wallet/core/account/kind.rs b/src/wallet/core/account/kind.rs index 28cfa86b..a3d73179 100644 --- a/src/wallet/core/account/kind.rs +++ b/src/wallet/core/account/kind.rs @@ -49,6 +49,14 @@ impl PyAccountKind { self.py_to_string() } + /// The detailed string representation. + /// + /// Returns: + /// str: The account kind as a repr string. + pub fn __repr__(&self) -> String { + format!("AccountKind('{}')", self.0.as_str()) + } + /// Get the string representation. /// /// Returns: diff --git a/tests/conftest.py b/tests/conftest.py index d544fb0f..c9a966c6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,10 @@ Shared fixtures for Kaspa Python SDK tests. """ +import os +import shutil +import uuid + import pytest import pytest_asyncio @@ -14,6 +18,7 @@ Address, RpcClient, Resolver, + Wallet, ) @@ -146,3 +151,123 @@ def test_address(network_id): if network_id.startswith("testnet"): return "kaspatest:qr0lr4ml9fn3chekrqmjdkergxl93l4wrk3dankcgvjq776s9wn9jhtkdksae" return TEST_MAINNET_ADDRESS + + +# ============================================================================= +# Wallet Fixtures (Unit-test scope — no network required) +# ============================================================================= + +# Default passwords used across wallet unit tests. +TEST_WALLET_SECRET = "test-wallet-secret" +TEST_NEW_WALLET_SECRET = "test-wallet-secret-new" + +# Directory where the local wallet store persists wallet files. +_KASPA_LOCAL_STORE_DIR = os.path.expanduser("~/.kaspa") + + +def cleanup_wallet_files(filename: str) -> None: + """Remove any wallet/transaction artifacts left by a test run. + + Sweeps both `~/.kaspa/` (default LocalStore folder) and the current + working directory. The CWD sweep covers `wallet_rename` with a bare + filename — upstream `Storage::rename_sync` treats the argument as a + plain `PathBuf`, so the renamed file lands in the pytest CWD, not + `~/.kaspa/`. + """ + root = os.path.join(_KASPA_LOCAL_STORE_DIR, filename) + cwd_root = os.path.join(os.getcwd(), filename) + candidates = ( + f"{root}.wallet", + f"{root}.transactions", + root, + cwd_root, + f"{cwd_root}.wallet", + f"{cwd_root}.transactions", + ) + for candidate in candidates: + if os.path.isdir(candidate): + shutil.rmtree(candidate, ignore_errors=True) + else: + try: + os.remove(candidate) + except (FileNotFoundError, IsADirectoryError, PermissionError): + pass + + +@pytest.fixture +def unique_wallet_filename() -> str: + """Return a unique wallet filename (without extension) for isolated tests.""" + return f"unit-test-{uuid.uuid4().hex[:12]}" + + +@pytest_asyncio.fixture +async def started_wallet(): + """A started Wallet (testnet-10) with no wallet file opened. + + Yields the Wallet; `stop()` is awaited on teardown. + """ + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + await wallet.start() + try: + yield wallet + finally: + if wallet.is_open: + await wallet.wallet_close() + await wallet.stop() + + +@pytest_asyncio.fixture +async def open_wallet(unique_wallet_filename): + """A started Wallet with a freshly created wallet file opened. + + Yields (wallet, filename). Cleans up the wallet file on teardown. + """ + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + await wallet.start() + # The wallet title is also used as the filename when the wallet is + # re-imported from an exported payload; keep it unique per-test so + # export/import flows don't collide with other tests on disk. + title = unique_wallet_filename + try: + await wallet.wallet_create( + wallet_secret=TEST_WALLET_SECRET, + filename=unique_wallet_filename, + overwrite_wallet_storage=True, + title=title, + ) + yield wallet, unique_wallet_filename + finally: + if wallet.is_open: + await wallet.wallet_close() + await wallet.stop() + cleanup_wallet_files(unique_wallet_filename) + cleanup_wallet_files(title) + + +@pytest_asyncio.fixture +async def connected_wallet(unique_wallet_filename, network_id, rpc_url): + """A started Wallet with a fresh wallet file opened and RPC connected. + + Yields (wallet, filename). Disconnects + closes + cleans up on teardown. + Requires `--rpc-url` to be passed (integration tier). + """ + wallet = Wallet(network_id=network_id, url=rpc_url) + await wallet.start() + title = unique_wallet_filename + try: + await wallet.wallet_create( + wallet_secret=TEST_WALLET_SECRET, + filename=unique_wallet_filename, + overwrite_wallet_storage=True, + title=title, + ) + await wallet.connect(block_async_connect=True) + yield wallet, unique_wallet_filename + finally: + if wallet.rpc.is_connected: + await wallet.disconnect() + if wallet.is_open: + await wallet.wallet_close() + await wallet.stop() + cleanup_wallet_files(unique_wallet_filename) + cleanup_wallet_files(title) diff --git a/tests/integration/test_wallet.py b/tests/integration/test_wallet.py new file mode 100644 index 00000000..4b28cc5d --- /dev/null +++ b/tests/integration/test_wallet.py @@ -0,0 +1,255 @@ +""" +Integration tests for RPC-dependent Wallet methods. + +Requires a reachable Kaspa node — run with `--network-id testnet-10 --rpc-url `. +The module skips entirely when `--rpc-url` is not supplied, so the unit tier +on a CI box without network access still passes. +""" + +import asyncio + +import pytest + +from kaspa import ( + AccountDescriptor, + AccountKind, + AccountsDiscoveryKind, + CommitRevealAddressKind, + Fees, + Wallet, +) + +from tests.conftest import TEST_MNEMONIC_PHRASE, TEST_WALLET_SECRET +from tests.wallet_helpers import create_mnemonic_key, create_secret_key + + +# Most of these tests exercise failure paths where the wallet has no UTXOs +# and the binding should raise — there is no need for a funded account. +SYNC_TIMEOUT_SECONDS = 60 + + +@pytest.fixture(autouse=True) +def _require_rpc_url(rpc_url): + """Skip this module entirely when no --rpc-url is supplied.""" + if not rpc_url: + pytest.skip("integration tests require --rpc-url") + + +async def _wait_for_sync(wallet: Wallet, timeout: float = SYNC_TIMEOUT_SECONDS) -> bool: + """Poll wallet.is_synced until True or timeout elapses.""" + deadline = asyncio.get_event_loop().time() + timeout + while asyncio.get_event_loop().time() < deadline: + if wallet.is_synced: + return True + await asyncio.sleep(0.5) + return wallet.is_synced + + +# ============================================================================= +# connect / disconnect +# ============================================================================= + + +class TestWalletConnect: + """Tests for Wallet.connect and Wallet.disconnect.""" + + async def test_connect_eventually_syncs(self, connected_wallet): + """Test that a connected wallet eventually flips is_synced True.""" + wallet, _ = connected_wallet + assert await _wait_for_sync(wallet) is True + + async def test_disconnect_clears_sync_state(self, connected_wallet): + """Test that disconnect drops is_synced back to False.""" + wallet, _ = connected_wallet + await _wait_for_sync(wallet) + await wallet.disconnect() + # is_synced is event-driven; allow a short grace period for the flip. + for _ in range(20): + if wallet.is_synced is False: + break + await asyncio.sleep(0.25) + assert wallet.is_synced is False + + +# ============================================================================= +# accounts_import_keypair +# ============================================================================= + + +class TestAccountsImportKeypair: + """Tests for Wallet.accounts_import_keypair (requires RPC).""" + + async def test_import_keypair_returns_keypair_descriptor(self, connected_wallet): + """Test importing a SecretKey-kind prv key data yields a keypair account.""" + wallet, _ = connected_wallet + pid = await create_secret_key(wallet) + descriptor = await wallet.accounts_import_keypair( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ecdsa=False, + account_name="imported-keypair", + ) + assert isinstance(descriptor, AccountDescriptor) + assert descriptor.kind == AccountKind("keypair") + + +# ============================================================================= +# accounts_discovery +# ============================================================================= + + +class TestAccountsDiscovery: + """Tests for Wallet.accounts_discovery (requires RPC).""" + + async def test_discovery_with_known_mnemonic(self, connected_wallet): + """Test running discovery against a known mnemonic returns an index.""" + wallet, _ = connected_wallet + last_index = await wallet.accounts_discovery( + discovery_kind=AccountsDiscoveryKind.Bip44, + address_scan_extent=2, + account_scan_extent=1, + bip39_mnemonic=TEST_MNEMONIC_PHRASE, + ) + assert isinstance(last_index, int) + assert last_index >= 0 + + +# ============================================================================= +# accounts_estimate / accounts_send +# ============================================================================= + + +class TestAccountsEstimateAndSend: + """Tests for Wallet.accounts_estimate and Wallet.accounts_send (requires RPC).""" + + async def test_estimate_with_no_utxos_raises(self, connected_wallet): + """Test estimate on a fresh (unfunded) account raises insufficient-funds.""" + wallet, _ = connected_wallet + pid = await create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + await wallet.accounts_activate([descriptor.account_id]) + with pytest.raises(Exception): + await wallet.accounts_estimate( + account_id=descriptor.account_id, + priority_fee_sompi=Fees(0), + ) + + async def test_send_with_no_utxos_raises(self, connected_wallet): + """Test send from a fresh (unfunded) account raises insufficient-funds.""" + wallet, _ = connected_wallet + pid = await create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + await wallet.accounts_activate([descriptor.account_id]) + with pytest.raises(Exception): + await wallet.accounts_send( + wallet_secret=TEST_WALLET_SECRET, + account_id=descriptor.account_id, + priority_fee_sompi=Fees(0), + ) + + +# ============================================================================= +# accounts_get_utxos +# ============================================================================= + + +class TestAccountsGetUtxos: + """Tests for Wallet.accounts_get_utxos (requires RPC).""" + + async def test_get_utxos_on_fresh_account_returns_empty(self, connected_wallet): + """Test get_utxos on a freshly created account returns an empty list.""" + wallet, _ = connected_wallet + pid = await create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + await wallet.accounts_activate([descriptor.account_id]) + utxos = await wallet.accounts_get_utxos(descriptor.account_id) + assert utxos == [] + + +# ============================================================================= +# accounts_transfer +# ============================================================================= + + +class TestAccountsTransfer: + """Tests for Wallet.accounts_transfer (requires RPC).""" + + async def test_transfer_with_no_utxos_raises(self, connected_wallet): + """Test transferring between two unfunded accounts raises insufficient-funds.""" + wallet, _ = connected_wallet + pid = await create_mnemonic_key(wallet) + source = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + account_name="source", + account_index=0, + ) + destination = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + account_name="destination", + account_index=1, + ) + await wallet.accounts_activate([source.account_id, destination.account_id]) + with pytest.raises(Exception): + await wallet.accounts_transfer( + wallet_secret=TEST_WALLET_SECRET, + source_account_id=source.account_id, + destination_account_id=destination.account_id, + transfer_amount_sompi=1, + ) + + +# ============================================================================= +# accounts_commit_reveal / accounts_commit_reveal_manual +# ============================================================================= + + +class TestAccountsCommitReveal: + """Tests for Wallet.accounts_commit_reveal and _manual (requires RPC).""" + + async def test_commit_reveal_on_fresh_account_raises(self, connected_wallet): + """Test commit_reveal on an unfunded account raises.""" + wallet, _ = connected_wallet + pid = await create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + await wallet.accounts_activate([descriptor.account_id]) + with pytest.raises(Exception): + await wallet.accounts_commit_reveal( + wallet_secret=TEST_WALLET_SECRET, + account_id=descriptor.account_id, + address_type=CommitRevealAddressKind.Receive, + address_index=0, + script_sig=b"\x00", + commit_amount_sompi=1000, + reveal_fee_sompi=100, + ) + + async def test_commit_reveal_manual_on_fresh_account_raises(self, connected_wallet): + """Test commit_reveal_manual on an unfunded account raises.""" + wallet, _ = connected_wallet + pid = await create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + await wallet.accounts_activate([descriptor.account_id]) + with pytest.raises(Exception): + await wallet.accounts_commit_reveal_manual( + wallet_secret=TEST_WALLET_SECRET, + account_id=descriptor.account_id, + script_sig=b"\x00", + reveal_fee_sompi=100, + ) diff --git a/tests/unit/test_wallet.py b/tests/unit/test_wallet.py new file mode 100644 index 00000000..02a52423 --- /dev/null +++ b/tests/unit/test_wallet.py @@ -0,0 +1,414 @@ +""" +Unit tests for the Wallet class. + +These tests exercise the local-store and in-process behavior of Wallet +without requiring a connected RPC node. +""" + +import pytest + +from kaspa import ( + Encoding, + NetworkId, + Resolver, + RpcClient, + Wallet, + WalletDescriptor, + WalletEventType, +) +from tests.conftest import ( + TEST_NEW_WALLET_SECRET, + TEST_WALLET_SECRET, + cleanup_wallet_files, +) + + +class TestWalletCreation: + """Tests for Wallet construction — focused on str-vs-object arg conversion.""" + + def test_create_wallet_with_string_network_id(self): + """Test creating a Wallet with a string network id.""" + wallet = Wallet(network_id="testnet-10") + assert isinstance(wallet, Wallet) + + def test_create_wallet_with_network_id_object(self): + """Test creating a Wallet with a NetworkId object.""" + wallet = Wallet(network_id=NetworkId("testnet-10")) + assert isinstance(wallet, Wallet) + + def test_create_wallet_with_encoding_enum(self): + """Test creating a Wallet with an Encoding enum.""" + wallet = Wallet(network_id="testnet-10", encoding=Encoding.Borsh) + assert isinstance(wallet, Wallet) + + def test_create_wallet_with_encoding_string(self): + """Test creating a Wallet with a string encoding.""" + wallet = Wallet(network_id="testnet-10", encoding="borsh") + assert isinstance(wallet, Wallet) + + +class TestWalletProperties: + """Tests for Wallet property accessors.""" + + def test_rpc_property_returns_rpc_client(self): + """Test that the rpc property returns an RpcClient.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + assert isinstance(wallet.rpc, RpcClient) + + def test_is_open_false_before_any_wallet(self): + """Test is_open is False for a freshly created Wallet.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + assert wallet.is_open is False + + def test_is_synced_false_before_connect(self): + """Test is_synced is False before the wallet connects.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + assert wallet.is_synced is False + + def test_descriptor_is_none_before_open(self): + """Test descriptor is None before a wallet file is opened.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + assert wallet.descriptor is None + + +class TestWalletSetNetworkId: + """Tests for Wallet.set_network_id.""" + + def test_set_network_id_with_string(self): + """Test setting network id with a string.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + wallet.set_network_id("mainnet") + + def test_set_network_id_with_object(self): + """Test setting network id with a NetworkId instance.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + wallet.set_network_id(NetworkId("testnet-10")) + + def test_set_network_id_invalid_raises(self): + """Test that setting an invalid network id raises an error.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + with pytest.raises(Exception): + wallet.set_network_id("not-a-real-network") + + +class TestWalletEventListeners: + """Tests for Wallet event listener registration.""" + + @pytest.mark.parametrize("event", [WalletEventType.Balance, "balance"]) + def test_add_and_remove_listener(self, event): + """Test registering and removing a listener with both enum and string arg forms.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + + def cb(event): + _ = event + + wallet.add_event_listener(event, cb) + wallet.remove_event_listener(event, cb) + + def test_add_listener_with_all_event(self): + """Test registering an 'all' listener that receives every event.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + + def cb(event): + _ = event + + wallet.add_event_listener(WalletEventType.All, cb) + wallet.remove_event_listener(WalletEventType.All) + + def test_add_listener_with_args_and_kwargs(self): + """Test registering a listener with extra positional and keyword arguments.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + + def cb(event, *args, **kwargs): + _ = (event, args, kwargs) + + wallet.add_event_listener("balance", cb, 1, 2, foo="bar") + wallet.remove_event_listener("balance", cb) + + def test_remove_listener_without_callback_clears_event(self): + """Test that remove without a callback removes every listener for the event.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + + def cb(event): + _ = event + + wallet.add_event_listener("balance", cb) + wallet.remove_event_listener("balance") + + def test_remove_all_listeners_with_all(self): + """Test that remove with 'all' and no callback clears every listener.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + + def cb(event): + _ = event + + wallet.add_event_listener("balance", cb) + wallet.add_event_listener("connect", cb) + wallet.remove_event_listener(WalletEventType.All) + + def test_add_listener_invalid_event_raises(self): + """Test that registering a listener for an unknown event raises.""" + wallet = Wallet(network_id="testnet-10", resolver=Resolver()) + + def cb(event): + _ = event + + with pytest.raises(Exception): + wallet.add_event_listener("not-a-real-event", cb) + + +class TestWalletExists: + """Tests for Wallet.exists.""" + + async def test_exists_returns_false_for_missing_file(self, started_wallet, unique_wallet_filename): + """Test exists() returns False when the wallet file does not exist.""" + assert await started_wallet.exists(unique_wallet_filename) is False + + async def test_exists_returns_true_after_create(self, open_wallet): + """Test exists() returns True after a wallet file is created.""" + wallet, filename = open_wallet + assert await wallet.exists(filename) is True + + +class TestWalletEnumerate: + """Tests for Wallet.wallet_enumerate.""" + + async def test_enumerate_returns_list(self, started_wallet): + """Test wallet_enumerate returns a list of WalletDescriptor.""" + result = await started_wallet.wallet_enumerate() + assert isinstance(result, list) + for entry in result: + assert isinstance(entry, WalletDescriptor) + + async def test_enumerate_includes_created_wallet(self, open_wallet): + """Test that an open (created) wallet appears in wallet_enumerate.""" + wallet, filename = open_wallet + descriptors = await wallet.wallet_enumerate() + filenames = [d.filename for d in descriptors] + assert filename in filenames + + +class TestWalletCreate: + """Tests for Wallet.wallet_create.""" + + async def test_create_wallet_file(self, started_wallet, unique_wallet_filename): + """Test creating a new wallet file.""" + try: + resp = await started_wallet.wallet_create( + wallet_secret=TEST_WALLET_SECRET, + filename=unique_wallet_filename, + overwrite_wallet_storage=True, + title="unit-test", + user_hint="hint", + ) + assert isinstance(resp, dict) + assert started_wallet.is_open is True + assert started_wallet.descriptor is not None + assert started_wallet.descriptor.filename == unique_wallet_filename + assert started_wallet.descriptor.title == "unit-test" + finally: + try: + await started_wallet.wallet_close() + except Exception: + pass + cleanup_wallet_files(unique_wallet_filename) + + async def test_create_wallet_without_overwrite_on_existing_raises( + self, started_wallet, unique_wallet_filename + ): + """Test that creating an existing wallet without overwrite raises.""" + try: + await started_wallet.wallet_create( + wallet_secret=TEST_WALLET_SECRET, + filename=unique_wallet_filename, + overwrite_wallet_storage=True, + ) + await started_wallet.wallet_close() + + with pytest.raises(Exception): + await started_wallet.wallet_create( + wallet_secret=TEST_WALLET_SECRET, + filename=unique_wallet_filename, + overwrite_wallet_storage=False, + ) + finally: + try: + await started_wallet.wallet_close() + except Exception: + pass + cleanup_wallet_files(unique_wallet_filename) + + +class TestWalletOpenClose: + """Tests for Wallet.wallet_open and wallet_close.""" + + async def test_open_after_close(self, open_wallet): + """Test that a wallet can be closed and re-opened.""" + wallet, filename = open_wallet + await wallet.wallet_close() + assert wallet.is_open is False + resp = await wallet.wallet_open(TEST_WALLET_SECRET, True, filename) + assert isinstance(resp, dict) + assert wallet.is_open is True + + async def test_open_with_wrong_password_raises(self, open_wallet): + """Test opening a wallet with the wrong password raises.""" + wallet, filename = open_wallet + await wallet.wallet_close() + with pytest.raises(Exception): + await wallet.wallet_open("wrong-password", False, filename) + + async def test_close_sets_is_open_false(self, open_wallet): + """Test that wallet_close flips is_open to False.""" + wallet, _ = open_wallet + await wallet.wallet_close() + assert wallet.is_open is False + assert wallet.descriptor is None + + +class TestWalletRename: + """Tests for Wallet.wallet_rename.""" + + async def test_rename_title(self, open_wallet): + """Test renaming the wallet title only.""" + wallet, _ = open_wallet + await wallet.wallet_rename(TEST_WALLET_SECRET, "renamed", None) + assert wallet.descriptor.title == "renamed" + + async def test_rename_filename(self, open_wallet): + """Test renaming both the wallet file on disk and reopening.""" + wallet, filename = open_wallet + new_filename = filename + "-renamed" + try: + await wallet.wallet_rename(TEST_WALLET_SECRET, None, new_filename) + assert wallet.descriptor.filename == new_filename + finally: + cleanup_wallet_files(new_filename) + + +class TestWalletChangeSecret: + """Tests for Wallet.wallet_change_secret.""" + + async def test_change_secret_allows_reopen_with_new_password(self, open_wallet): + """Test changing the password works and the new password can reopen.""" + wallet, filename = open_wallet + await wallet.wallet_change_secret(TEST_WALLET_SECRET, TEST_NEW_WALLET_SECRET) + await wallet.wallet_close() + + await wallet.wallet_open(TEST_NEW_WALLET_SECRET, False, filename) + assert wallet.is_open is True + + async def test_change_secret_with_wrong_old_password_raises(self, open_wallet): + """Test that providing the wrong old password raises.""" + wallet, _ = open_wallet + with pytest.raises(Exception): + await wallet.wallet_change_secret("wrong-old", TEST_NEW_WALLET_SECRET) + + +class TestWalletExportImport: + """Tests for Wallet.wallet_export and wallet_import.""" + + async def test_export_returns_hex_string(self, open_wallet): + """Test wallet_export returns a non-empty hex string.""" + wallet, _ = open_wallet + exported = await wallet.wallet_export(TEST_WALLET_SECRET, False) + assert isinstance(exported, str) + assert len(exported) > 0 + # valid hex + int(exported, 16) + + async def test_import_with_wrong_secret_raises(self, open_wallet): + """Test that importing with the wrong secret raises.""" + wallet, _ = open_wallet + exported = await wallet.wallet_export(TEST_WALLET_SECRET, False) + await wallet.wallet_close() + with pytest.raises(Exception): + await wallet.wallet_import("wrong-password", exported) + + +class TestWalletReload: + """Tests for Wallet.wallet_reload.""" + + @pytest.mark.parametrize("reactivate", [False, True]) + async def test_reload(self, open_wallet, reactivate): + """Test reloading an open wallet with both reactivate arg values.""" + wallet, _ = open_wallet + await wallet.wallet_reload(reactivate) + assert wallet.is_open is True + + +class TestWalletBatchFlush: + """Tests for Wallet.batch / flush.""" + + async def test_batch_then_flush(self, open_wallet): + """Test that batch followed by flush completes without error.""" + wallet, _ = open_wallet + await wallet.batch() + await wallet.flush(TEST_WALLET_SECRET) + + async def test_flush_with_wrong_password_raises(self, open_wallet): + """Test that flush with the wrong password raises.""" + wallet, _ = open_wallet + await wallet.batch() + with pytest.raises(Exception): + await wallet.flush("wrong-password") + + +class TestWalletRetainContext: + """Tests for Wallet.retain_context.""" + + async def test_retain_context_with_bytes(self, open_wallet): + """Test storing arbitrary bytes context data.""" + wallet, _ = open_wallet + await wallet.retain_context("ctx-name", b"payload-bytes") + + async def test_retain_context_with_none_clears(self, open_wallet): + """Test clearing a context entry with None data.""" + wallet, _ = open_wallet + await wallet.retain_context("ctx-name", b"payload-bytes") + await wallet.retain_context("ctx-name", None) + + +class TestWalletGetStatus: + """Tests for Wallet.get_status.""" + + async def test_get_status_default(self, open_wallet): + """Test get_status returns a dict with expected keys.""" + wallet, filename = open_wallet + status = await wallet.get_status() + assert isinstance(status, dict) + assert status.get("isOpen") is True + assert status.get("networkId") == "testnet-10" + descriptor = status.get("walletDescriptor") or {} + assert descriptor.get("filename") == filename + + async def test_get_status_with_name(self, open_wallet): + """Test get_status accepts an explicit name argument and reports the wallet.""" + wallet, filename = open_wallet + status = await wallet.get_status(filename) + descriptor = status.get("walletDescriptor") or {} + assert descriptor.get("filename") == filename + + +class TestWalletAddressBook: + """Tests for Wallet.address_book_enumerate.""" + + async def test_address_book_enumerate_raises_not_implemented(self, open_wallet): + """Test that address_book_enumerate currently raises NotImplemented upstream.""" + wallet, _ = open_wallet + with pytest.raises(Exception): + await wallet.address_book_enumerate() + + +class TestWalletFeeRatePoller: + """Tests for Wallet.fee_rate_poller_enable / fee_rate_poller_disable.""" + + async def test_fee_rate_poller_enable_and_disable(self, started_wallet): + """Test enabling and disabling the fee-rate poller offline.""" + await started_wallet.fee_rate_poller_enable(5) + await started_wallet.fee_rate_poller_disable() + + async def test_fee_rate_estimate_without_rpc_raises(self, started_wallet): + """Test fee_rate_estimate raises when not connected to RPC.""" + with pytest.raises(Exception): + await started_wallet.fee_rate_estimate() diff --git a/tests/unit/test_wallet_accounts.py b/tests/unit/test_wallet_accounts.py new file mode 100644 index 00000000..b8a914f2 --- /dev/null +++ b/tests/unit/test_wallet_accounts.py @@ -0,0 +1,510 @@ +""" +Unit tests for Wallet account and private-key-data operations. + +Covers prv_key_data_* and accounts_* methods that execute locally without +requiring a live RPC connection. +""" + +import pytest + +from kaspa import ( + AccountDescriptor, + AccountId, + AccountKind, + Address, + Mnemonic, + NetworkId, + NewAddressKind, + PrvKeyDataId, + PrvKeyDataInfo, + PrvKeyDataVariantKind, + TransactionKind, +) +from tests.conftest import TEST_WALLET_SECRET +from tests.wallet_helpers import ( + create_mnemonic_key as _create_mnemonic_key, + create_secret_key as _create_secret_key, +) + + +# ============================================================================= +# prv_key_data_* +# ============================================================================= + + +class TestPrvKeyDataCreate: + """Tests for Wallet.prv_key_data_create.""" + + async def test_create_mnemonic_returns_id(self, open_wallet): + """Test creating a Mnemonic-kind prv key data returns a PrvKeyDataId.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + assert isinstance(pid, PrvKeyDataId) + + async def test_create_secret_key_returns_id(self, open_wallet): + """Test creating a SecretKey-kind prv key data returns a PrvKeyDataId.""" + wallet, _ = open_wallet + pid = await _create_secret_key(wallet) + assert isinstance(pid, PrvKeyDataId) + + async def test_create_with_kind_string(self, open_wallet): + """Test that the kind argument accepts a string alias.""" + wallet, _ = open_wallet + pid = await wallet.prv_key_data_create( + wallet_secret=TEST_WALLET_SECRET, + secret=Mnemonic.random().phrase, + kind="mnemonic", + name="string-kind-test", + ) + assert isinstance(pid, PrvKeyDataId) + + async def test_create_with_invalid_kind_string_raises(self, open_wallet): + """Test that an unsupported kind string raises.""" + wallet, _ = open_wallet + with pytest.raises(Exception): + await wallet.prv_key_data_create( + wallet_secret=TEST_WALLET_SECRET, + secret=Mnemonic.random().phrase, + kind="not-a-real-kind", + ) + + async def test_create_with_wrong_wallet_secret_raises(self, open_wallet): + """Test that an incorrect wallet secret raises.""" + wallet, _ = open_wallet + with pytest.raises(Exception): + await wallet.prv_key_data_create( + wallet_secret="wrong-password", + secret=Mnemonic.random().phrase, + kind=PrvKeyDataVariantKind.Mnemonic, + ) + + +class TestPrvKeyDataEnumerate: + """Tests for Wallet.prv_key_data_enumerate.""" + + async def test_enumerate_empty(self, open_wallet): + """Test enumerate returns an empty list when no entries are stored.""" + wallet, _ = open_wallet + infos = await wallet.prv_key_data_enumerate() + assert infos == [] + + async def test_enumerate_includes_created_entries(self, open_wallet): + """Test enumerate lists every created entry as a PrvKeyDataInfo.""" + wallet, _ = open_wallet + pid_a = await _create_mnemonic_key(wallet, name="key-a") + pid_b = await _create_secret_key(wallet, name="key-b") + + infos = await wallet.prv_key_data_enumerate() + assert len(infos) == 2 + for info in infos: + assert isinstance(info, PrvKeyDataInfo) + + ids = {str(info.id) for info in infos} + assert str(pid_a) in ids + assert str(pid_b) in ids + + +class TestPrvKeyDataGet: + """Tests for Wallet.prv_key_data_get.""" + + async def test_get_returns_info(self, open_wallet): + """Test prv_key_data_get returns a PrvKeyDataInfo for a known id.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet, name="getter") + info = await wallet.prv_key_data_get(TEST_WALLET_SECRET, pid) + assert isinstance(info, PrvKeyDataInfo) + assert info.id == pid + assert info.name == "getter" + + async def test_get_accepts_hex_string_id(self, open_wallet): + """Test prv_key_data_get accepts a hex string id.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + info = await wallet.prv_key_data_get(TEST_WALLET_SECRET, str(pid)) + assert isinstance(info, PrvKeyDataInfo) + + async def test_get_with_wrong_password_raises(self, open_wallet): + """Test that an incorrect password raises.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + with pytest.raises(Exception): + await wallet.prv_key_data_get("wrong-password", pid) + + +class TestPrvKeyDataRemove: + """Tests for Wallet.prv_key_data_remove.""" + + async def test_remove_raises_not_implemented(self, open_wallet): + """Test that prv_key_data_remove currently raises NotImplemented upstream.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + with pytest.raises(Exception): + await wallet.prv_key_data_remove(TEST_WALLET_SECRET, pid) + + +# ============================================================================= +# accounts_* +# ============================================================================= + + +class TestAccountsCreate: + """Tests for Wallet.accounts_create_bip32 / accounts_create_keypair.""" + + async def test_create_bip32_returns_descriptor(self, open_wallet): + """Test accounts_create_bip32 returns an AccountDescriptor.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + account_name="bip32-acct", + ) + assert isinstance(descriptor, AccountDescriptor) + assert isinstance(descriptor.account_id, AccountId) + assert isinstance(descriptor.kind, AccountKind) + assert descriptor.account_name == "bip32-acct" + assert descriptor.receive_address is not None + assert descriptor.change_address is not None + + async def test_create_bip32_with_explicit_account_index(self, open_wallet): + """Test accounts_create_bip32 honors an explicit account_index. + + Two accounts derived from the same key but different indexes must + produce distinct addresses — otherwise the argument is ignored. + """ + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + default_desc = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + account_name="default", + account_index=0, + ) + indexed_desc = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + account_name="indexed", + account_index=7, + ) + assert isinstance(indexed_desc, AccountDescriptor) + assert indexed_desc.account_id != default_desc.account_id + assert indexed_desc.receive_address != default_desc.receive_address + + async def test_create_keypair_returns_descriptor(self, open_wallet): + """Test accounts_create_keypair returns an AccountDescriptor.""" + wallet, _ = open_wallet + pid = await _create_secret_key(wallet) + descriptor = await wallet.accounts_create_keypair( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ecdsa=False, + account_name="kp-acct", + ) + assert isinstance(descriptor, AccountDescriptor) + + async def test_create_bip32_with_wrong_password_raises(self, open_wallet): + """Test accounts_create_bip32 raises with the wrong wallet secret.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + with pytest.raises(Exception): + await wallet.accounts_create_bip32( + wallet_secret="wrong", + prv_key_data_id=pid, + ) + + +class TestAccountsImport: + """Tests for Wallet.accounts_import_bip32 / accounts_import_keypair.""" + + async def test_import_bip32_without_rpc_connection_raises(self, open_wallet): + """Test accounts_import_bip32 raises when no RPC connection is available.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + with pytest.raises(Exception): + await wallet.accounts_import_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + account_name="imported", + ) + + +class TestAccountsEnsureDefault: + """Tests for Wallet.accounts_ensure_default.""" + + async def test_ensure_default_bip32_creates_account(self, open_wallet): + """Test ensure_default with AccountKind('bip32') returns a descriptor.""" + wallet, _ = open_wallet + descriptor = await wallet.accounts_ensure_default( + wallet_secret=TEST_WALLET_SECRET, + account_kind=AccountKind("bip32"), + mnemonic_phrase=Mnemonic.random().phrase, + ) + assert isinstance(descriptor, AccountDescriptor) + + async def test_ensure_default_is_idempotent(self, open_wallet): + """Test ensure_default returns the existing account on a second call.""" + wallet, _ = open_wallet + first = await wallet.accounts_ensure_default( + wallet_secret=TEST_WALLET_SECRET, + account_kind=AccountKind("bip32"), + mnemonic_phrase=Mnemonic.random().phrase, + ) + second = await wallet.accounts_ensure_default( + wallet_secret=TEST_WALLET_SECRET, + account_kind=AccountKind("bip32"), + mnemonic_phrase=Mnemonic.random().phrase, + ) + assert first.account_id == second.account_id + + +class TestAccountsEnumerate: + """Tests for Wallet.accounts_enumerate.""" + + async def test_enumerate_empty(self, open_wallet): + """Test accounts_enumerate returns [] when no accounts exist.""" + wallet, _ = open_wallet + accounts = await wallet.accounts_enumerate() + assert accounts == [] + + async def test_enumerate_includes_created_accounts(self, open_wallet): + """Test accounts_enumerate returns an entry for every created account.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + account_name="acct-1", + ) + accounts = await wallet.accounts_enumerate() + assert len(accounts) == 1 + assert isinstance(accounts[0], AccountDescriptor) + + +class TestAccountsGet: + """Tests for Wallet.accounts_get.""" + + async def test_accounts_get_with_account_id(self, open_wallet): + """Test accounts_get accepts an AccountId instance.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + await wallet.accounts_get(descriptor.account_id) + + async def test_accounts_get_with_hex_string(self, open_wallet): + """Test accounts_get accepts a hex string id.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + await wallet.accounts_get(str(descriptor.account_id)) + + async def test_accounts_get_missing_raises(self, open_wallet): + """Test accounts_get raises for an unknown account id.""" + wallet, _ = open_wallet + bogus = AccountId("0" * 64) + with pytest.raises(Exception): + await wallet.accounts_get(bogus) + + +class TestAccountsRename: + """Tests for Wallet.accounts_rename.""" + + async def test_rename_account(self, open_wallet): + """Test renaming an account updates its descriptor name.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + account_name="old-name", + ) + await wallet.accounts_rename( + wallet_secret=TEST_WALLET_SECRET, + account_id=descriptor.account_id, + name="new-name", + ) + accounts = await wallet.accounts_enumerate() + names = [a.account_name for a in accounts] + assert "new-name" in names + + async def test_rename_with_wrong_password_raises(self, open_wallet): + """Test accounts_rename raises with the wrong wallet secret.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + with pytest.raises(Exception): + await wallet.accounts_rename( + wallet_secret="wrong", + account_id=descriptor.account_id, + name="whatever", + ) + # The failed rename leaves the store's modified flag set. + # Trigger a successful save via a valid rename so teardown's wallet_close + # doesn't panic. + # This feels like an upstream bug in rk native + await wallet.accounts_rename( + wallet_secret=TEST_WALLET_SECRET, + account_id=descriptor.account_id, + name="cleanup", + ) + + +class TestAccountsCreateNewAddress: + """Tests for Wallet.accounts_create_new_address.""" + + async def test_create_receive_address(self, open_wallet): + """Test deriving a new receive address returns an Address.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + address = await wallet.accounts_create_new_address( + descriptor.account_id, NewAddressKind.Receive + ) + assert isinstance(address, Address) + assert address.prefix == "kaspatest" + + async def test_create_change_address_with_string_kind(self, open_wallet): + """Test deriving a change address with a string kind argument.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + address = await wallet.accounts_create_new_address( + descriptor.account_id, "change" + ) + assert isinstance(address, Address) + + async def test_create_address_invalid_kind_string_raises(self, open_wallet): + """Test an unknown NewAddressKind string raises.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + with pytest.raises(Exception): + await wallet.accounts_create_new_address( + descriptor.account_id, "not-a-real-kind" + ) + + +class TestAccountsActivate: + """Tests for Wallet.accounts_activate.""" + + async def test_activate_specific_account(self, open_wallet): + """Test activating a single account id.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + await wallet.accounts_activate([descriptor.account_id]) + + async def test_activate_all_accounts(self, open_wallet): + """Test activating every account when account_ids is None.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + await wallet.accounts_activate() + + +# ============================================================================= +# transactions_* +# ============================================================================= + + +class TestTransactionsDataGet: + """Tests for Wallet.transactions_data_get.""" + + async def test_data_get_returns_empty_history(self, open_wallet): + """Test transactions_data_get returns an empty history for a new account.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + resp = await wallet.transactions_data_get( + account_id=descriptor.account_id, + network_id=NetworkId("testnet-10"), + start=0, + end=10, + ) + assert isinstance(resp, dict) + assert resp.get("total") == 0 + assert resp.get("transactions") == [] + + async def test_data_get_accepts_filter(self, open_wallet): + """Test transactions_data_get accepts a list of TransactionKind filters.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + resp = await wallet.transactions_data_get( + account_id=descriptor.account_id, + network_id=NetworkId("testnet-10"), + start=0, + end=10, + filter=[TransactionKind.Incoming, TransactionKind.Outgoing], + ) + assert isinstance(resp, dict) + assert resp.get("transactions") == [] + + +class TestTransactionsReplaceNote: + """Tests for Wallet.transactions_replace_note.""" + + async def test_replace_note_missing_transaction_raises(self, open_wallet): + """Test replacing a note on an unknown transaction id raises.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + with pytest.raises(Exception): + await wallet.transactions_replace_note( + account_id=descriptor.account_id, + network_id=NetworkId("testnet-10"), + transaction_id="0" * 64, + note="note", + ) + + +class TestTransactionsReplaceMetadata: + """Tests for Wallet.transactions_replace_metadata.""" + + async def test_replace_metadata_missing_transaction_raises(self, open_wallet): + """Test replacing metadata on an unknown transaction id raises.""" + wallet, _ = open_wallet + pid = await _create_mnemonic_key(wallet) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + ) + with pytest.raises(Exception): + await wallet.transactions_replace_metadata( + account_id=descriptor.account_id, + network_id=NetworkId("testnet-10"), + transaction_id="0" * 64, + metadata="meta", + ) diff --git a/tests/unit/test_wallet_types.py b/tests/unit/test_wallet_types.py new file mode 100644 index 00000000..7326e1d6 --- /dev/null +++ b/tests/unit/test_wallet_types.py @@ -0,0 +1,326 @@ +""" +Unit tests for Wallet-adjacent wrapper types. + +Covers AccountId, AccountKind, PrvKeyDataId, PrvKeyDataInfo, +AccountDescriptor, WalletDescriptor, and the Wallet-related enum +wrappers. +""" + +import pytest + +from kaspa import ( + AccountDescriptor, + AccountId, + AccountKind, + AccountsDiscoveryKind, + CommitRevealAddressKind, + Mnemonic, + NewAddressKind, + PrvKeyDataId, + PrvKeyDataInfo, + PrvKeyDataVariantKind, + TransactionKind, + WalletDescriptor, + WalletEventType, +) +from tests.conftest import TEST_WALLET_SECRET, cleanup_wallet_files + + +TEST_ACCOUNT_ID_HEX = "5f9ded675fea85011a084c3c80a209693840136b6f5a60e8691df6f7d181174d" +TEST_PRV_KEY_DATA_ID_HEX = "01d843d334c65c8f" + + +# ============================================================================= +# AccountId +# ============================================================================= + + +class TestAccountIdCreation: + """Tests for AccountId construction.""" + + def test_create_from_valid_hex(self): + """Test creating an AccountId from a valid hex string.""" + account_id = AccountId(TEST_ACCOUNT_ID_HEX) + assert isinstance(account_id, AccountId) + + def test_create_from_invalid_hex_raises(self): + """Test that an invalid hex string raises.""" + with pytest.raises(Exception): + AccountId("not-a-valid-hex-string") + + +class TestAccountIdStringification: + """Tests for AccountId string conversion.""" + + def test_str_returns_hex(self): + """Test __str__ returns the hex representation.""" + account_id = AccountId(TEST_ACCOUNT_ID_HEX) + assert str(account_id) == TEST_ACCOUNT_ID_HEX + + def test_repr_includes_hex(self): + """Test __repr__ includes the hex representation.""" + account_id = AccountId(TEST_ACCOUNT_ID_HEX) + text = repr(account_id) + assert len(text) > 0 + assert TEST_ACCOUNT_ID_HEX in text + + def test_to_hex_returns_hex(self): + """Test to_hex() returns the hex string.""" + account_id = AccountId(TEST_ACCOUNT_ID_HEX) + assert account_id.to_hex() == TEST_ACCOUNT_ID_HEX + + +class TestAccountIdEquality: + """Tests for AccountId equality semantics.""" + + def test_same_hex_is_equal(self): + """Test two AccountIds from the same hex compare equal.""" + assert AccountId(TEST_ACCOUNT_ID_HEX) == AccountId(TEST_ACCOUNT_ID_HEX) + + def test_different_hex_is_not_equal(self): + """Test two AccountIds from different hex compare not equal.""" + other_hex = "0" * 64 + assert AccountId(TEST_ACCOUNT_ID_HEX) != AccountId(other_hex) + + +# ============================================================================= +# AccountKind +# ============================================================================= + + +class TestAccountKind: + """Tests for AccountKind construction and stringification.""" + + @pytest.mark.parametrize("kind_str", ["legacy", "bip32", "multisig", "keypair"]) + def test_create_from_valid_string(self, kind_str): + """Test creating an AccountKind from every supported string.""" + kind = AccountKind(kind_str) + assert isinstance(kind, AccountKind) + + def test_create_from_invalid_string_raises(self): + """Test that an invalid kind string raises.""" + with pytest.raises(Exception): + AccountKind("not-a-real-kind") + + def test_stringification(self): + """Test __str__ and to_string() both return the canonical kind string.""" + kind = AccountKind("bip32") + assert str(kind) == "kaspa-bip32-standard" + assert kind.to_string() == "kaspa-bip32-standard" + + def test_equality(self): + """Test two AccountKinds built from the same string compare equal.""" + assert AccountKind("bip32") == AccountKind("bip32") + assert AccountKind("bip32") != AccountKind("keypair") + + def test_repr_includes_kind_name(self): + """Test __repr__ is non-empty and contains the kind name.""" + kind = AccountKind("bip32") + text = repr(kind) + assert len(text) > 0 + assert "bip32" in text + + +# ============================================================================= +# PrvKeyDataId +# ============================================================================= + + +class TestPrvKeyDataIdCreation: + """Tests for PrvKeyDataId construction.""" + + def test_create_from_valid_hex(self): + """Test creating a PrvKeyDataId from a valid hex string.""" + pid = PrvKeyDataId(TEST_PRV_KEY_DATA_ID_HEX) + assert isinstance(pid, PrvKeyDataId) + + def test_create_from_invalid_hex_raises(self): + """Test that an invalid hex string raises.""" + with pytest.raises(Exception): + PrvKeyDataId("xyz-not-hex") + + +class TestPrvKeyDataIdStringification: + """Tests for PrvKeyDataId string conversion.""" + + def test_str_returns_hex(self): + """Test __str__ returns the hex representation.""" + pid = PrvKeyDataId(TEST_PRV_KEY_DATA_ID_HEX) + assert str(pid) == TEST_PRV_KEY_DATA_ID_HEX + + def test_repr_includes_hex(self): + """Test __repr__ includes the hex representation.""" + pid = PrvKeyDataId(TEST_PRV_KEY_DATA_ID_HEX) + text = repr(pid) + assert len(text) > 0 + assert TEST_PRV_KEY_DATA_ID_HEX in text + + def test_to_hex_returns_hex(self): + """Test to_hex() returns the hex string.""" + pid = PrvKeyDataId(TEST_PRV_KEY_DATA_ID_HEX) + assert pid.to_hex() == TEST_PRV_KEY_DATA_ID_HEX + + +class TestPrvKeyDataIdEquality: + """Tests for PrvKeyDataId equality semantics.""" + + def test_same_hex_is_equal(self): + """Test two PrvKeyDataIds from the same hex compare equal.""" + assert PrvKeyDataId(TEST_PRV_KEY_DATA_ID_HEX) == PrvKeyDataId( + TEST_PRV_KEY_DATA_ID_HEX + ) + + def test_different_hex_is_not_equal(self): + """Test two PrvKeyDataIds from different hex compare not equal.""" + assert PrvKeyDataId(TEST_PRV_KEY_DATA_ID_HEX) != PrvKeyDataId("ffffffffffffffff") + + +# ============================================================================= +# PrvKeyDataInfo +# ============================================================================= + + +class TestPrvKeyDataInfo: + """Tests for PrvKeyDataInfo properties.""" + + async def test_info_properties(self, open_wallet): + """Test id/name/is_encrypted properties on a created entry.""" + wallet, _ = open_wallet + pid = await wallet.prv_key_data_create( + wallet_secret=TEST_WALLET_SECRET, + secret=Mnemonic.random().phrase, + kind=PrvKeyDataVariantKind.Mnemonic, + name="info-test", + ) + infos = await wallet.prv_key_data_enumerate() + assert len(infos) == 1 + info = infos[0] + assert isinstance(info, PrvKeyDataInfo) + assert info.id == pid + assert info.name == "info-test" + assert info.is_encrypted is False + + async def test_info_repr(self, open_wallet): + """Test __repr__ includes the id and name.""" + wallet, _ = open_wallet + await wallet.prv_key_data_create( + wallet_secret=TEST_WALLET_SECRET, + secret=Mnemonic.random().phrase, + kind=PrvKeyDataVariantKind.Mnemonic, + name="repr-test", + ) + info = (await wallet.prv_key_data_enumerate())[0] + text = repr(info) + assert "PrvKeyDataInfo" in text + assert "repr-test" in text + + +# ============================================================================= +# AccountDescriptor +# ============================================================================= + + +class TestAccountDescriptor: + """Tests for AccountDescriptor properties.""" + + async def test_descriptor_properties(self, open_wallet): + """Test the key AccountDescriptor getters on a newly created account.""" + wallet, _ = open_wallet + pid = await wallet.prv_key_data_create( + wallet_secret=TEST_WALLET_SECRET, + secret=Mnemonic.random().phrase, + kind=PrvKeyDataVariantKind.Mnemonic, + ) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + account_name="acct-desc", + ) + assert isinstance(descriptor, AccountDescriptor) + assert isinstance(descriptor.account_id, AccountId) + assert isinstance(descriptor.kind, AccountKind) + assert descriptor.account_name == "acct-desc" + assert descriptor.balance is None + assert isinstance(descriptor.receive_address, str) + assert isinstance(descriptor.change_address, str) + + async def test_descriptor_repr(self, open_wallet): + """Test __repr__ includes the account kind and id.""" + wallet, _ = open_wallet + pid = await wallet.prv_key_data_create( + wallet_secret=TEST_WALLET_SECRET, + secret=Mnemonic.random().phrase, + kind=PrvKeyDataVariantKind.Mnemonic, + ) + descriptor = await wallet.accounts_create_bip32( + wallet_secret=TEST_WALLET_SECRET, + prv_key_data_id=pid, + account_name="acct-repr", + ) + text = repr(descriptor) + assert "AccountDescriptor" in text + assert "acct-repr" in text + assert str(descriptor.account_id) in text + + +# ============================================================================= +# WalletDescriptor +# ============================================================================= + + +class TestWalletDescriptor: + """Tests for WalletDescriptor properties.""" + + async def test_descriptor_properties(self, started_wallet, unique_wallet_filename): + """Test filename and title are independently plumbed on the descriptor.""" + distinct_title = f"title-{unique_wallet_filename}" + try: + await started_wallet.wallet_create( + wallet_secret=TEST_WALLET_SECRET, + filename=unique_wallet_filename, + overwrite_wallet_storage=True, + title=distinct_title, + ) + descriptor = started_wallet.descriptor + assert isinstance(descriptor, WalletDescriptor) + assert descriptor.filename == unique_wallet_filename + assert descriptor.title == distinct_title + finally: + cleanup_wallet_files(unique_wallet_filename) + + async def test_descriptor_repr_includes_filename(self, open_wallet): + """Test __repr__ includes the wallet filename.""" + wallet, filename = open_wallet + text = repr(wallet.descriptor) + assert "WalletDescriptor" in text + assert filename in text + + +# ============================================================================= +# Enum Wrapper Types +# ============================================================================= + + +@pytest.mark.parametrize( + "enum_cls,member", + [ + (WalletEventType, name) + for name in ("All", "Connect", "Disconnect", "WalletOpen", + "WalletClose", "Balance", "AccountCreate", "SyncState") + ] + + [(AccountsDiscoveryKind, "Bip44")] + + [(NewAddressKind, name) for name in ("Receive", "Change")] + + [(CommitRevealAddressKind, name) for name in ("Receive", "Change")] + + [ + (PrvKeyDataVariantKind, name) + for name in ("Mnemonic", "Bip39Seed", "ExtendedPrivateKey", "SecretKey") + ] + + [ + (TransactionKind, name) + for name in ("Incoming", "Outgoing", "External", "Change", "Batch", + "Reorg", "Stasis", "TransferIncoming", "TransferOutgoing") + ], +) +def test_enum_member_exposed(enum_cls, member): + """Every expected variant on the wallet-related enum wrappers is exposed.""" + assert getattr(enum_cls, member) is not None diff --git a/tests/wallet_helpers.py b/tests/wallet_helpers.py new file mode 100644 index 00000000..fd0f1f59 --- /dev/null +++ b/tests/wallet_helpers.py @@ -0,0 +1,43 @@ +""" +Shared helpers for wallet-related tests. + +Used by both `tests/unit/test_wallet_accounts.py` and +`tests/integration/test_wallet.py` to avoid duplicating the +prv_key_data_create boilerplate. +""" + +from kaspa import Mnemonic, PrvKeyDataId, PrvKeyDataVariantKind + +from tests.conftest import TEST_WALLET_SECRET + + +# The SecretKey variant of PrvKeyDataVariantKind passes the `secret` String +# straight to `Secret::from(secret)` (see `prv_key_data_create` in +# `src/wallet/core/wallet.rs`), so upstream wallet-core reads its UTF-8 +# bytes directly as the secp256k1 key material — NOT as a hex string. +# 32 repeated "a" chars = 32 bytes of 0x61, a valid secp256k1 scalar. +TEST_SECRET_KEY_RAW = "a" * 32 + + +async def create_mnemonic_key( + wallet, name: str = "unit-test-mnemonic", phrase: str | None = None +) -> PrvKeyDataId: + """Create a Mnemonic-kind prv key data entry and return its id.""" + return await wallet.prv_key_data_create( + wallet_secret=TEST_WALLET_SECRET, + secret=phrase if phrase is not None else Mnemonic.random().phrase, + kind=PrvKeyDataVariantKind.Mnemonic, + name=name, + ) + + +async def create_secret_key( + wallet, name: str = "unit-test-secret-key" +) -> PrvKeyDataId: + """Create a SecretKey-kind prv key data entry and return its id.""" + return await wallet.prv_key_data_create( + wallet_secret=TEST_WALLET_SECRET, + secret=TEST_SECRET_KEY_RAW, + kind=PrvKeyDataVariantKind.SecretKey, + name=name, + )