From deeca54f1df05d8354803cc1e0c91e4e2cc8bcd7 Mon Sep 17 00:00:00 2001 From: Sondov Engen Date: Thu, 22 Jan 2026 11:30:59 +0100 Subject: [PATCH 1/3] cli: add new `--add-ssh-key` functionality Adding new SSH keys to novem can be a bit painful, this simplifies the task Fixes: https://github.com/novem-code/novem-python/issues/165 --- novem/cli/__init__.py | 69 ++++++++- novem/cli/setup.py | 12 ++ tests/test_cli_ssh_key.py | 288 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 368 insertions(+), 1 deletion(-) create mode 100644 tests/test_cli_ssh_key.py diff --git a/novem/cli/__init__.py b/novem/cli/__init__.py index 3e038f3..a3c95a3 100644 --- a/novem/cli/__init__.py +++ b/novem/cli/__init__.py @@ -334,7 +334,6 @@ def run_cli_wrapped() -> None: Print some sytem information """ import platform - import socket # TODO: use argument supplied config path instead @@ -400,6 +399,74 @@ def run_cli_wrapped() -> None: print(info) return + # handle --add-ssh-key to add an SSH key for git access + if args and args.get("add_ssh_key"): + if args.get("profile"): + args["config_profile"] = args["profile"] + + key_arg = args["add_ssh_key"] + + # Determine key_id: use argument if string, otherwise default to lowercase hostname + if isinstance(key_arg, str): + key_id = key_arg.lower() + else: + key_id = socket.gethostname().lower() + + # Sanitize key_id: only lowercase alphanumeric, - and _ allowed + # Replace dots with dashes, strip other invalid characters + valid_chars = string.ascii_lowercase + string.digits + "-_" + key_id = key_id.replace(".", "-") + key_id = "".join(c for c in key_id if c in valid_chars) + + # Check if stdin is a TTY (interactive terminal with no piped input) + if sys.stdin.isatty(): + print("Error: No SSH key provided. Pipe your public key to stdin, e.g.:", file=sys.stderr) + print(" cat ~/.ssh/id_rsa.pub | novem --add-ssh-key", file=sys.stderr) + sys.exit(1) + + # Read SSH key from stdin + ssh_key = sys.stdin.read().strip() + if not ssh_key: + print("Error: No SSH key provided on stdin", file=sys.stderr) + sys.exit(1) + + novem = NovemAPI(**args, is_cli=True) + + # Check if key_id already exists + try: + existing_keys = novem.read("admin/keys") + # The keys are returned as newline-separated list of key IDs + existing_key_ids = [k.strip() for k in existing_keys.strip().split("\n") if k.strip()] + if key_id in existing_key_ids: + print( + f'Error: SSH key with id "{key_id}" already exists. ' + f"Please choose a different key id by passing it as an argument: --add-ssh-key ", + file=sys.stderr, + ) + sys.exit(1) + except Exception: + # If we can't read the keys list, proceed anyway (will fail at create if there's an issue) + pass + + # Create the key entry and write all fields + try: + novem.create(f"admin/keys/{key_id}") + novem.write(f"admin/keys/{key_id}/key", ssh_key) + + # Set the name to the hostname + hostname = socket.gethostname() + novem.write(f"admin/keys/{key_id}/name", hostname) + + # Set the summary + summary = f"{hostname} key created with novem cli v{__version__}" + novem.write(f"admin/keys/{key_id}/summary", summary) + except Exception as e: + print(f"Error: Failed to add SSH key: {e}", file=sys.stderr) + sys.exit(1) + + print(f'{cl.OKGREEN} \u2713 {cl.ENDC}SSH key "{cl.OKCYAN}{key_id}{cl.ENDC}" added successfully') + return + # handle --gql to run a GraphQL query from stdin, file, or inline # Only run standalone if no other commands are specified if args and args.get("gql"): diff --git a/novem/cli/setup.py b/novem/cli/setup.py index 708bea8..a025993 100644 --- a/novem/cli/setup.py +++ b/novem/cli/setup.py @@ -61,6 +61,18 @@ def setup(raw_args: Any = None) -> Tuple[Any, Dict[str, str]]: "or combine with -p/-g/-m/-j/-u to show debug output", ) + parser.add_argument( + "--add-ssh-key", + dest="add_ssh_key", + action="store", + required=False, + nargs="?", + const=True, + default=False, + help="Add an SSH key for git access. Reads key from stdin. " + "Optional argument specifies key ID (defaults to lowercase hostname)", + ) + parser.add_argument( "--dump", metavar=("OUT_PATH"), diff --git a/tests/test_cli_ssh_key.py b/tests/test_cli_ssh_key.py new file mode 100644 index 0000000..b14033a --- /dev/null +++ b/tests/test_cli_ssh_key.py @@ -0,0 +1,288 @@ +import pytest + +from novem.utils import API_ROOT + +from .conftest import CliExit +from .utils import write_config + +auth_req = { + "username": "demouser", + "password": "demopass", + "token_name": "demotoken", + "token_description": "test token", +} + + +def test_add_ssh_key_from_stdin(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key reads key from stdin and creates key with hostname as id.""" + write_config(auth_req) + + # Mock hostname + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + # Track API calls + api_calls = [] + + def track_get(request, context): + api_calls.append(("GET", request.path)) + # Return empty list of keys (no existing keys) + return "" + + def track_put(request, context): + api_calls.append(("PUT", request.path)) + return "" + + def track_post(request, context): + api_calls.append(("POST", request.path, request.text)) + return "" + + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=track_get) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/testhost", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/summary", text=track_post) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC... test@example.com" + out, err = cli("--add-ssh-key", stdin=ssh_key) + + # Verify API calls were made in correct order + assert ("GET", "/v1/admin/keys") in api_calls + assert ("PUT", "/v1/admin/keys/testhost") in api_calls + assert any( + call[0] == "POST" and call[1] == "/v1/admin/keys/testhost/key" and ssh_key in call[2] for call in api_calls + ) + assert any( + call[0] == "POST" and call[1] == "/v1/admin/keys/testhost/name" and "TestHost" in call[2] for call in api_calls + ) + assert any( + call[0] == "POST" and call[1] == "/v1/admin/keys/testhost/summary" and "TestHost" in call[2] + for call in api_calls + ) + + # Verify success message + assert "testhost" in out + assert "added successfully" in out + + +def test_add_ssh_key_with_custom_id(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key with custom key id.""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + api_calls = [] + + def track_get(request, context): + api_calls.append(("GET", request.path)) + return "" + + def track_put(request, context): + api_calls.append(("PUT", request.path)) + return "" + + def track_post(request, context): + api_calls.append(("POST", request.path, request.text)) + return "" + + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=track_get) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/my-custom-key", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-custom-key/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-custom-key/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-custom-key/summary", text=track_post) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + out, err = cli("--add-ssh-key", "my-custom-key", stdin=ssh_key) + + # Verify the custom key ID was used + assert ("PUT", "/v1/admin/keys/my-custom-key") in api_calls + assert "my-custom-key" in out + assert "added successfully" in out + + +def test_add_ssh_key_already_exists(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key fails when key id already exists.""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + def return_existing_keys(request, context): + # Return existing keys list + return "testhost\notherkey" + + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=return_existing_keys) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + + with pytest.raises(CliExit) as e: + cli("--add-ssh-key", stdin=ssh_key) + + out, err = e.value.args + assert e.value.code == 1 + assert "already exists" in err + assert "testhost" in err + + +def test_add_ssh_key_no_stdin(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key fails when no key provided on stdin (whitespace only).""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + with pytest.raises(CliExit) as e: + # Provide whitespace-only stdin which should be stripped to empty + cli("--add-ssh-key", stdin=" \n \t ") + + out, err = e.value.args + assert e.value.code == 1 + assert "No SSH key provided" in err + + +def test_add_ssh_key_tty_error(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key fails with helpful message when run interactively (TTY).""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + # Simulate running in an interactive terminal (TTY) + monkeypatch.setattr("sys.stdin.isatty", lambda: True) + + with pytest.raises(CliExit) as e: + cli("--add-ssh-key") + + out, err = e.value.args + assert e.value.code == 1 + assert "No SSH key provided" in err + assert "cat ~/.ssh/id_rsa.pub" in err + + +def test_add_ssh_key_respects_profile(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key respects --profile option.""" + # Write config with two profiles + conf = """\ +[general] +profile = user1 + +[app:cli] +version = 0.5.0 + +[profile:user1] +username = user1 +api_root = https://api1.test/v1 +token = token1 + +[profile:user2] +username = user2 +api_root = https://api2.test/v1 +token = token2 +""" + + import os + + from novem.utils import get_config_path + + cfolder, cpath = get_config_path() + os.makedirs(cfolder, exist_ok=True) + with open(cpath, "w") as f: + f.write(conf) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + captured_tokens = [] + + def capture_get(request, context): + auth_header = request.headers.get("Authorization", "") + if auth_header.startswith("Bearer "): + captured_tokens.append(auth_header[7:]) + return "" + + def capture_other(request, context): + return "" + + # Register mocks for both API roots + for api_root in ["https://api1.test/v1/", "https://api2.test/v1/"]: + requests_mock.register_uri("GET", f"{api_root}admin/keys", text=capture_get) + requests_mock.register_uri("PUT", f"{api_root}admin/keys/testhost", text=capture_other) + requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/key", text=capture_other) + requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/name", text=capture_other) + requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/summary", text=capture_other) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + + # Default profile (user1) + cli("--add-ssh-key", stdin=ssh_key) + assert captured_tokens[-1] == "token1" + + # Explicit profile user2 + cli("--profile", "user2", "--add-ssh-key", stdin=ssh_key) + assert captured_tokens[-1] == "token2" + + +def test_add_ssh_key_sanitizes_hostname(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key sanitizes hostname with dots and special chars.""" + write_config(auth_req) + + # Hostname with dots (common on macOS like "imoen.local") + monkeypatch.setattr("socket.gethostname", lambda: "My-Host.local") + + api_calls = [] + + def track_get(request, context): + api_calls.append(("GET", request.path)) + return "" + + def track_put(request, context): + api_calls.append(("PUT", request.path)) + return "" + + def track_post(request, context): + api_calls.append(("POST", request.path)) + return "" + + # The sanitized key_id should be "my-host-local" (dots -> dashes, lowercase) + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=track_get) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/my-host-local", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-host-local/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-host-local/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-host-local/summary", text=track_post) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + out, err = cli("--add-ssh-key", stdin=ssh_key) + + # Verify the sanitized key ID was used + assert ("PUT", "/v1/admin/keys/my-host-local") in api_calls + assert "my-host-local" in out + assert "added successfully" in out + + +def test_add_ssh_key_summary_includes_version(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key includes version in summary.""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + summary_text = None + + def track_get(request, context): + return "" + + def track_put(request, context): + return "" + + def track_post(request, context): + nonlocal summary_text + if "summary" in request.path: + summary_text = request.text + return "" + + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=track_get) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/testhost", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/summary", text=track_post) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + cli("--add-ssh-key", stdin=ssh_key) + + # Verify summary includes version + assert summary_text is not None + assert "TestHost" in summary_text + assert "novem cli v" in summary_text From 35f17946112d379ac92e1c186c42d897e61a8cc8 Mon Sep 17 00:00:00 2001 From: Sondov Engen Date: Thu, 22 Jan 2026 11:49:49 +0100 Subject: [PATCH 2/3] cli: improve error handling of `--add-ssh-key` --- novem/cli/__init__.py | 62 +++++++++++++++++++++++++++--------- tests/test_cli_ssh_key.py | 66 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 110 insertions(+), 18 deletions(-) diff --git a/novem/cli/__init__.py b/novem/cli/__init__.py index a3c95a3..bb620e6 100644 --- a/novem/cli/__init__.py +++ b/novem/cli/__init__.py @@ -448,23 +448,55 @@ def run_cli_wrapped() -> None: # If we can't read the keys list, proceed anyway (will fail at create if there's an issue) pass - # Create the key entry and write all fields - try: - novem.create(f"admin/keys/{key_id}") - novem.write(f"admin/keys/{key_id}/key", ssh_key) - - # Set the name to the hostname - hostname = socket.gethostname() - novem.write(f"admin/keys/{key_id}/name", hostname) - - # Set the summary - summary = f"{hostname} key created with novem cli v{__version__}" - novem.write(f"admin/keys/{key_id}/summary", summary) - except Exception as e: - print(f"Error: Failed to add SSH key: {e}", file=sys.stderr) + # Create the key entry - use session directly to check response + api_root = novem._api_root + session = novem._session + + # Try to create the key entry (409 means it already exists, which is fine - we'll update it) + is_update = False + r = session.put(f"{api_root}admin/keys/{key_id}") + if r.status_code == 409 or (r.text and "already exist" in r.text.lower()): + is_update = True + elif not r.ok: + try: + err = r.json().get("message", r.text) + except Exception: + err = r.text + print(f"Error: Failed to create SSH key: {err}", file=sys.stderr) + sys.exit(1) + + # Write the key content + r = session.post( + f"{api_root}admin/keys/{key_id}/key", + headers={"Content-type": "text/plain"}, + data=ssh_key.encode("utf-8"), + ) + if not r.ok or (r.text and "Failure" in r.text) or (r.text and "Duplicate" in r.text): + try: + err = r.json().get("message", r.text) + except Exception: + err = r.text + print(f"Error: Failed to add SSH key: {err}", file=sys.stderr) sys.exit(1) - print(f'{cl.OKGREEN} \u2713 {cl.ENDC}SSH key "{cl.OKCYAN}{key_id}{cl.ENDC}" added successfully') + # Set the name to the hostname + hostname = socket.gethostname() + session.post( + f"{api_root}admin/keys/{key_id}/name", + headers={"Content-type": "text/plain"}, + data=hostname.encode("utf-8"), + ) + + # Set the summary + action = "updated" if is_update else "created" + summary = f"{hostname} key {action} with novem cli v{__version__}" + session.post( + f"{api_root}admin/keys/{key_id}/summary", + headers={"Content-type": "text/plain"}, + data=summary.encode("utf-8"), + ) + + print(f'{cl.OKGREEN} \u2713 {cl.ENDC}SSH key "{cl.OKCYAN}{key_id}{cl.ENDC}" {action} successfully') return # handle --gql to run a GraphQL query from stdin, file, or inline diff --git a/tests/test_cli_ssh_key.py b/tests/test_cli_ssh_key.py index b14033a..5bbf51d 100644 --- a/tests/test_cli_ssh_key.py +++ b/tests/test_cli_ssh_key.py @@ -61,7 +61,7 @@ def track_post(request, context): # Verify success message assert "testhost" in out - assert "added successfully" in out + assert "successfully" in out def test_add_ssh_key_with_custom_id(cli, requests_mock, fs, monkeypatch): @@ -96,7 +96,7 @@ def track_post(request, context): # Verify the custom key ID was used assert ("PUT", "/v1/admin/keys/my-custom-key") in api_calls assert "my-custom-key" in out - assert "added successfully" in out + assert "successfully" in out def test_add_ssh_key_already_exists(cli, requests_mock, fs, monkeypatch): @@ -250,7 +250,67 @@ def track_post(request, context): # Verify the sanitized key ID was used assert ("PUT", "/v1/admin/keys/my-host-local") in api_calls assert "my-host-local" in out - assert "added successfully" in out + assert "successfully" in out + + +def test_add_ssh_key_update_existing(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key updates existing key when it already exists (409).""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + def return_empty_keys(request, context): + return "" + + def return_conflict(request, context): + context.status_code = 409 + return '{"status": "Failure", "message": "The plot already exist!"}' + + def return_ok(request, context): + return "" + + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=return_empty_keys) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/testhost", text=return_conflict) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/key", text=return_ok) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/name", text=return_ok) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/summary", text=return_ok) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + out, err = cli("--add-ssh-key", stdin=ssh_key) + + # Should succeed with "updated" message + assert "testhost" in out + assert "updated successfully" in out + + +def test_add_ssh_key_duplicate_key(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key handles duplicate key content gracefully.""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + def return_empty_keys(request, context): + return "" + + def return_ok(request, context): + return "" + + def return_duplicate(request, context): + return '{"status": "Failure", "message": "Duplicate key detected, keys need to be unique."}' + + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=return_empty_keys) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/testhost", text=return_ok) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/key", text=return_duplicate) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + + with pytest.raises(CliExit) as e: + cli("--add-ssh-key", stdin=ssh_key) + + out, err = e.value.args + assert e.value.code == 1 + assert "Failed to add SSH key" in err + assert "Duplicate" in err def test_add_ssh_key_summary_includes_version(cli, requests_mock, fs, monkeypatch): From e0f9bc71188ca3f1a2bd4dfd9f40ee6351fd3853 Mon Sep 17 00:00:00 2001 From: Sondov Engen Date: Thu, 22 Jan 2026 13:24:32 +0100 Subject: [PATCH 3/3] cli: name ssh key based on comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Explicit argument: --add-ssh-key my-key → uses my-key as key_id - SSH key comment: If key has user@host.com comment → uses user-host-com as key_id, user@host.com as name - Fallback: No argument, no comment → uses hostname for both --- novem/cli/__init__.py | 69 +++++++++++++------------ tests/test_cli_ssh_key.py | 106 ++++++++++++++++++-------------------- 2 files changed, 87 insertions(+), 88 deletions(-) diff --git a/novem/cli/__init__.py b/novem/cli/__init__.py index bb620e6..897b145 100644 --- a/novem/cli/__init__.py +++ b/novem/cli/__init__.py @@ -405,18 +405,7 @@ def run_cli_wrapped() -> None: args["config_profile"] = args["profile"] key_arg = args["add_ssh_key"] - - # Determine key_id: use argument if string, otherwise default to lowercase hostname - if isinstance(key_arg, str): - key_id = key_arg.lower() - else: - key_id = socket.gethostname().lower() - - # Sanitize key_id: only lowercase alphanumeric, - and _ allowed - # Replace dots with dashes, strip other invalid characters - valid_chars = string.ascii_lowercase + string.digits + "-_" - key_id = key_id.replace(".", "-") - key_id = "".join(c for c in key_id if c in valid_chars) + hostname = socket.gethostname() # Check if stdin is a TTY (interactive terminal with no piped input) if sys.stdin.isatty(): @@ -430,23 +419,38 @@ def run_cli_wrapped() -> None: print("Error: No SSH key provided on stdin", file=sys.stderr) sys.exit(1) - novem = NovemAPI(**args, is_cli=True) + # Try to extract comment from SSH key (format: ) + # The comment is often something like "user@hostname" + ssh_key_comment: Optional[str] = None + ssh_key_parts = ssh_key.split() + if len(ssh_key_parts) >= 3: + ssh_key_comment = " ".join(ssh_key_parts[2:]) + + # Determine key_id and name: + # 1. If explicit argument provided, use that for key_id + # 2. Otherwise, if SSH key has a comment, use that + # 3. Fall back to hostname + if isinstance(key_arg, str): + key_id = key_arg.lower() + key_name = key_arg + elif ssh_key_comment: + key_id = ssh_key_comment.lower() + key_name = ssh_key_comment + else: + key_id = hostname.lower() + key_name = hostname - # Check if key_id already exists - try: - existing_keys = novem.read("admin/keys") - # The keys are returned as newline-separated list of key IDs - existing_key_ids = [k.strip() for k in existing_keys.strip().split("\n") if k.strip()] - if key_id in existing_key_ids: - print( - f'Error: SSH key with id "{key_id}" already exists. ' - f"Please choose a different key id by passing it as an argument: --add-ssh-key ", - file=sys.stderr, - ) - sys.exit(1) - except Exception: - # If we can't read the keys list, proceed anyway (will fail at create if there's an issue) - pass + # Sanitize key_id: only lowercase alphanumeric, - and _ allowed + # Replace dots, @, and spaces with dashes, strip other invalid characters + valid_chars = string.ascii_lowercase + string.digits + "-_" + key_id = key_id.replace(".", "-").replace("@", "-").replace(" ", "-") + key_id = "".join(c for c in key_id if c in valid_chars) + # Collapse multiple dashes and strip leading/trailing dashes + while "--" in key_id: + key_id = key_id.replace("--", "-") + key_id = key_id.strip("-") + + novem = NovemAPI(**args, is_cli=True) # Create the key entry - use session directly to check response api_root = novem._api_root @@ -479,17 +483,16 @@ def run_cli_wrapped() -> None: print(f"Error: Failed to add SSH key: {err}", file=sys.stderr) sys.exit(1) - # Set the name to the hostname - hostname = socket.gethostname() + # Set the name (from comment or hostname) session.post( f"{api_root}admin/keys/{key_id}/name", headers={"Content-type": "text/plain"}, - data=hostname.encode("utf-8"), + data=key_name.encode("utf-8"), ) - # Set the summary + # Set the summary (include hostname since we're updating the key) action = "updated" if is_update else "created" - summary = f"{hostname} key {action} with novem cli v{__version__}" + summary = f"Key {action} from {hostname} with novem cli v{__version__}" session.post( f"{api_root}admin/keys/{key_id}/summary", headers={"Content-type": "text/plain"}, diff --git a/tests/test_cli_ssh_key.py b/tests/test_cli_ssh_key.py index 5bbf51d..cf2ee64 100644 --- a/tests/test_cli_ssh_key.py +++ b/tests/test_cli_ssh_key.py @@ -13,21 +13,14 @@ } -def test_add_ssh_key_from_stdin(cli, requests_mock, fs, monkeypatch): - """Test --add-ssh-key reads key from stdin and creates key with hostname as id.""" +def test_add_ssh_key_from_stdin_with_comment(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key uses SSH key comment for key_id and name.""" write_config(auth_req) - # Mock hostname monkeypatch.setattr("socket.gethostname", lambda: "TestHost") - # Track API calls api_calls = [] - def track_get(request, context): - api_calls.append(("GET", request.path)) - # Return empty list of keys (no existing keys) - return "" - def track_put(request, context): api_calls.append(("PUT", request.path)) return "" @@ -36,30 +29,60 @@ def track_post(request, context): api_calls.append(("POST", request.path, request.text)) return "" - requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=track_get) - requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/testhost", text=track_put) - requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/key", text=track_post) - requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/name", text=track_post) - requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/summary", text=track_post) + # SSH key comment "test@example.com" becomes key_id "test-example-com" + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/test-example-com", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/test-example-com/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/test-example-com/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/test-example-com/summary", text=track_post) ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC... test@example.com" out, err = cli("--add-ssh-key", stdin=ssh_key) - # Verify API calls were made in correct order - assert ("GET", "/v1/admin/keys") in api_calls - assert ("PUT", "/v1/admin/keys/testhost") in api_calls + # Verify key_id derived from comment + assert ("PUT", "/v1/admin/keys/test-example-com") in api_calls + # Name should be the original comment assert any( - call[0] == "POST" and call[1] == "/v1/admin/keys/testhost/key" and ssh_key in call[2] for call in api_calls - ) - assert any( - call[0] == "POST" and call[1] == "/v1/admin/keys/testhost/name" and "TestHost" in call[2] for call in api_calls + call[0] == "POST" and call[1] == "/v1/admin/keys/test-example-com/name" and "test@example.com" in call[2] + for call in api_calls ) + # Summary should include hostname assert any( - call[0] == "POST" and call[1] == "/v1/admin/keys/testhost/summary" and "TestHost" in call[2] + call[0] == "POST" and call[1] == "/v1/admin/keys/test-example-com/summary" and "TestHost" in call[2] for call in api_calls ) - # Verify success message + assert "test-example-com" in out + assert "successfully" in out + + +def test_add_ssh_key_from_stdin_no_comment(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key falls back to hostname when SSH key has no comment.""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + api_calls = [] + + def track_put(request, context): + api_calls.append(("PUT", request.path)) + return "" + + def track_post(request, context): + api_calls.append(("POST", request.path, request.text)) + return "" + + # No comment in SSH key, should fall back to hostname + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/testhost", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/summary", text=track_post) + + # SSH key without comment (only algo and key) + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + out, err = cli("--add-ssh-key", stdin=ssh_key) + + # Verify hostname used as key_id + assert ("PUT", "/v1/admin/keys/testhost") in api_calls assert "testhost" in out assert "successfully" in out @@ -99,29 +122,6 @@ def track_post(request, context): assert "successfully" in out -def test_add_ssh_key_already_exists(cli, requests_mock, fs, monkeypatch): - """Test --add-ssh-key fails when key id already exists.""" - write_config(auth_req) - - monkeypatch.setattr("socket.gethostname", lambda: "TestHost") - - def return_existing_keys(request, context): - # Return existing keys list - return "testhost\notherkey" - - requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=return_existing_keys) - - ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." - - with pytest.raises(CliExit) as e: - cli("--add-ssh-key", stdin=ssh_key) - - out, err = e.value.args - assert e.value.code == 1 - assert "already exists" in err - assert "testhost" in err - - def test_add_ssh_key_no_stdin(cli, requests_mock, fs, monkeypatch): """Test --add-ssh-key fails when no key provided on stdin (whitespace only).""" write_config(auth_req) @@ -188,22 +188,18 @@ def test_add_ssh_key_respects_profile(cli, requests_mock, fs, monkeypatch): captured_tokens = [] - def capture_get(request, context): + def capture_token(request, context): auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): captured_tokens.append(auth_header[7:]) return "" - def capture_other(request, context): - return "" - # Register mocks for both API roots for api_root in ["https://api1.test/v1/", "https://api2.test/v1/"]: - requests_mock.register_uri("GET", f"{api_root}admin/keys", text=capture_get) - requests_mock.register_uri("PUT", f"{api_root}admin/keys/testhost", text=capture_other) - requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/key", text=capture_other) - requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/name", text=capture_other) - requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/summary", text=capture_other) + requests_mock.register_uri("PUT", f"{api_root}admin/keys/testhost", text=capture_token) + requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/key", text=capture_token) + requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/name", text=capture_token) + requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/summary", text=capture_token) ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..."