diff --git a/novem/cli/__init__.py b/novem/cli/__init__.py index 3e038f3..897b145 100644 --- a/novem/cli/__init__.py +++ b/novem/cli/__init__.py @@ -334,7 +334,6 @@ def run_cli_wrapped() -> None: Print some sytem information """ import platform - import socket # TODO: use argument supplied config path instead @@ -400,6 +399,109 @@ def run_cli_wrapped() -> None: print(info) return + # handle --add-ssh-key to add an SSH key for git access + if args and args.get("add_ssh_key"): + if args.get("profile"): + args["config_profile"] = args["profile"] + + key_arg = args["add_ssh_key"] + hostname = socket.gethostname() + + # Check if stdin is a TTY (interactive terminal with no piped input) + if sys.stdin.isatty(): + print("Error: No SSH key provided. Pipe your public key to stdin, e.g.:", file=sys.stderr) + print(" cat ~/.ssh/id_rsa.pub | novem --add-ssh-key", file=sys.stderr) + sys.exit(1) + + # Read SSH key from stdin + ssh_key = sys.stdin.read().strip() + if not ssh_key: + print("Error: No SSH key provided on stdin", file=sys.stderr) + sys.exit(1) + + # Try to extract comment from SSH key (format: ) + # The comment is often something like "user@hostname" + ssh_key_comment: Optional[str] = None + ssh_key_parts = ssh_key.split() + if len(ssh_key_parts) >= 3: + ssh_key_comment = " ".join(ssh_key_parts[2:]) + + # Determine key_id and name: + # 1. If explicit argument provided, use that for key_id + # 2. Otherwise, if SSH key has a comment, use that + # 3. Fall back to hostname + if isinstance(key_arg, str): + key_id = key_arg.lower() + key_name = key_arg + elif ssh_key_comment: + key_id = ssh_key_comment.lower() + key_name = ssh_key_comment + else: + key_id = hostname.lower() + key_name = hostname + + # Sanitize key_id: only lowercase alphanumeric, - and _ allowed + # Replace dots, @, and spaces with dashes, strip other invalid characters + valid_chars = string.ascii_lowercase + string.digits + "-_" + key_id = key_id.replace(".", "-").replace("@", "-").replace(" ", "-") + key_id = "".join(c for c in key_id if c in valid_chars) + # Collapse multiple dashes and strip leading/trailing dashes + while "--" in key_id: + key_id = key_id.replace("--", "-") + key_id = key_id.strip("-") + + novem = NovemAPI(**args, is_cli=True) + + # Create the key entry - use session directly to check response + api_root = novem._api_root + session = novem._session + + # Try to create the key entry (409 means it already exists, which is fine - we'll update it) + is_update = False + r = session.put(f"{api_root}admin/keys/{key_id}") + if r.status_code == 409 or (r.text and "already exist" in r.text.lower()): + is_update = True + elif not r.ok: + try: + err = r.json().get("message", r.text) + except Exception: + err = r.text + print(f"Error: Failed to create SSH key: {err}", file=sys.stderr) + sys.exit(1) + + # Write the key content + r = session.post( + f"{api_root}admin/keys/{key_id}/key", + headers={"Content-type": "text/plain"}, + data=ssh_key.encode("utf-8"), + ) + if not r.ok or (r.text and "Failure" in r.text) or (r.text and "Duplicate" in r.text): + try: + err = r.json().get("message", r.text) + except Exception: + err = r.text + print(f"Error: Failed to add SSH key: {err}", file=sys.stderr) + sys.exit(1) + + # Set the name (from comment or hostname) + session.post( + f"{api_root}admin/keys/{key_id}/name", + headers={"Content-type": "text/plain"}, + data=key_name.encode("utf-8"), + ) + + # Set the summary (include hostname since we're updating the key) + action = "updated" if is_update else "created" + summary = f"Key {action} from {hostname} with novem cli v{__version__}" + session.post( + f"{api_root}admin/keys/{key_id}/summary", + headers={"Content-type": "text/plain"}, + data=summary.encode("utf-8"), + ) + + print(f'{cl.OKGREEN} \u2713 {cl.ENDC}SSH key "{cl.OKCYAN}{key_id}{cl.ENDC}" {action} successfully') + return + # handle --gql to run a GraphQL query from stdin, file, or inline # Only run standalone if no other commands are specified if args and args.get("gql"): diff --git a/novem/cli/setup.py b/novem/cli/setup.py index 708bea8..a025993 100644 --- a/novem/cli/setup.py +++ b/novem/cli/setup.py @@ -61,6 +61,18 @@ def setup(raw_args: Any = None) -> Tuple[Any, Dict[str, str]]: "or combine with -p/-g/-m/-j/-u to show debug output", ) + parser.add_argument( + "--add-ssh-key", + dest="add_ssh_key", + action="store", + required=False, + nargs="?", + const=True, + default=False, + help="Add an SSH key for git access. Reads key from stdin. " + "Optional argument specifies key ID (defaults to lowercase hostname)", + ) + parser.add_argument( "--dump", metavar=("OUT_PATH"), diff --git a/tests/test_cli_ssh_key.py b/tests/test_cli_ssh_key.py new file mode 100644 index 0000000..cf2ee64 --- /dev/null +++ b/tests/test_cli_ssh_key.py @@ -0,0 +1,344 @@ +import pytest + +from novem.utils import API_ROOT + +from .conftest import CliExit +from .utils import write_config + +auth_req = { + "username": "demouser", + "password": "demopass", + "token_name": "demotoken", + "token_description": "test token", +} + + +def test_add_ssh_key_from_stdin_with_comment(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key uses SSH key comment for key_id and name.""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + api_calls = [] + + def track_put(request, context): + api_calls.append(("PUT", request.path)) + return "" + + def track_post(request, context): + api_calls.append(("POST", request.path, request.text)) + return "" + + # SSH key comment "test@example.com" becomes key_id "test-example-com" + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/test-example-com", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/test-example-com/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/test-example-com/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/test-example-com/summary", text=track_post) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC... test@example.com" + out, err = cli("--add-ssh-key", stdin=ssh_key) + + # Verify key_id derived from comment + assert ("PUT", "/v1/admin/keys/test-example-com") in api_calls + # Name should be the original comment + assert any( + call[0] == "POST" and call[1] == "/v1/admin/keys/test-example-com/name" and "test@example.com" in call[2] + for call in api_calls + ) + # Summary should include hostname + assert any( + call[0] == "POST" and call[1] == "/v1/admin/keys/test-example-com/summary" and "TestHost" in call[2] + for call in api_calls + ) + + assert "test-example-com" in out + assert "successfully" in out + + +def test_add_ssh_key_from_stdin_no_comment(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key falls back to hostname when SSH key has no comment.""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + api_calls = [] + + def track_put(request, context): + api_calls.append(("PUT", request.path)) + return "" + + def track_post(request, context): + api_calls.append(("POST", request.path, request.text)) + return "" + + # No comment in SSH key, should fall back to hostname + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/testhost", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/summary", text=track_post) + + # SSH key without comment (only algo and key) + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + out, err = cli("--add-ssh-key", stdin=ssh_key) + + # Verify hostname used as key_id + assert ("PUT", "/v1/admin/keys/testhost") in api_calls + assert "testhost" in out + assert "successfully" in out + + +def test_add_ssh_key_with_custom_id(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key with custom key id.""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + api_calls = [] + + def track_get(request, context): + api_calls.append(("GET", request.path)) + return "" + + def track_put(request, context): + api_calls.append(("PUT", request.path)) + return "" + + def track_post(request, context): + api_calls.append(("POST", request.path, request.text)) + return "" + + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=track_get) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/my-custom-key", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-custom-key/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-custom-key/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-custom-key/summary", text=track_post) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + out, err = cli("--add-ssh-key", "my-custom-key", stdin=ssh_key) + + # Verify the custom key ID was used + assert ("PUT", "/v1/admin/keys/my-custom-key") in api_calls + assert "my-custom-key" in out + assert "successfully" in out + + +def test_add_ssh_key_no_stdin(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key fails when no key provided on stdin (whitespace only).""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + with pytest.raises(CliExit) as e: + # Provide whitespace-only stdin which should be stripped to empty + cli("--add-ssh-key", stdin=" \n \t ") + + out, err = e.value.args + assert e.value.code == 1 + assert "No SSH key provided" in err + + +def test_add_ssh_key_tty_error(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key fails with helpful message when run interactively (TTY).""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + # Simulate running in an interactive terminal (TTY) + monkeypatch.setattr("sys.stdin.isatty", lambda: True) + + with pytest.raises(CliExit) as e: + cli("--add-ssh-key") + + out, err = e.value.args + assert e.value.code == 1 + assert "No SSH key provided" in err + assert "cat ~/.ssh/id_rsa.pub" in err + + +def test_add_ssh_key_respects_profile(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key respects --profile option.""" + # Write config with two profiles + conf = """\ +[general] +profile = user1 + +[app:cli] +version = 0.5.0 + +[profile:user1] +username = user1 +api_root = https://api1.test/v1 +token = token1 + +[profile:user2] +username = user2 +api_root = https://api2.test/v1 +token = token2 +""" + + import os + + from novem.utils import get_config_path + + cfolder, cpath = get_config_path() + os.makedirs(cfolder, exist_ok=True) + with open(cpath, "w") as f: + f.write(conf) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + captured_tokens = [] + + def capture_token(request, context): + auth_header = request.headers.get("Authorization", "") + if auth_header.startswith("Bearer "): + captured_tokens.append(auth_header[7:]) + return "" + + # Register mocks for both API roots + for api_root in ["https://api1.test/v1/", "https://api2.test/v1/"]: + requests_mock.register_uri("PUT", f"{api_root}admin/keys/testhost", text=capture_token) + requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/key", text=capture_token) + requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/name", text=capture_token) + requests_mock.register_uri("POST", f"{api_root}admin/keys/testhost/summary", text=capture_token) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + + # Default profile (user1) + cli("--add-ssh-key", stdin=ssh_key) + assert captured_tokens[-1] == "token1" + + # Explicit profile user2 + cli("--profile", "user2", "--add-ssh-key", stdin=ssh_key) + assert captured_tokens[-1] == "token2" + + +def test_add_ssh_key_sanitizes_hostname(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key sanitizes hostname with dots and special chars.""" + write_config(auth_req) + + # Hostname with dots (common on macOS like "imoen.local") + monkeypatch.setattr("socket.gethostname", lambda: "My-Host.local") + + api_calls = [] + + def track_get(request, context): + api_calls.append(("GET", request.path)) + return "" + + def track_put(request, context): + api_calls.append(("PUT", request.path)) + return "" + + def track_post(request, context): + api_calls.append(("POST", request.path)) + return "" + + # The sanitized key_id should be "my-host-local" (dots -> dashes, lowercase) + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=track_get) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/my-host-local", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-host-local/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-host-local/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/my-host-local/summary", text=track_post) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + out, err = cli("--add-ssh-key", stdin=ssh_key) + + # Verify the sanitized key ID was used + assert ("PUT", "/v1/admin/keys/my-host-local") in api_calls + assert "my-host-local" in out + assert "successfully" in out + + +def test_add_ssh_key_update_existing(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key updates existing key when it already exists (409).""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + def return_empty_keys(request, context): + return "" + + def return_conflict(request, context): + context.status_code = 409 + return '{"status": "Failure", "message": "The plot already exist!"}' + + def return_ok(request, context): + return "" + + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=return_empty_keys) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/testhost", text=return_conflict) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/key", text=return_ok) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/name", text=return_ok) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/summary", text=return_ok) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + out, err = cli("--add-ssh-key", stdin=ssh_key) + + # Should succeed with "updated" message + assert "testhost" in out + assert "updated successfully" in out + + +def test_add_ssh_key_duplicate_key(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key handles duplicate key content gracefully.""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + def return_empty_keys(request, context): + return "" + + def return_ok(request, context): + return "" + + def return_duplicate(request, context): + return '{"status": "Failure", "message": "Duplicate key detected, keys need to be unique."}' + + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=return_empty_keys) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/testhost", text=return_ok) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/key", text=return_duplicate) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + + with pytest.raises(CliExit) as e: + cli("--add-ssh-key", stdin=ssh_key) + + out, err = e.value.args + assert e.value.code == 1 + assert "Failed to add SSH key" in err + assert "Duplicate" in err + + +def test_add_ssh_key_summary_includes_version(cli, requests_mock, fs, monkeypatch): + """Test --add-ssh-key includes version in summary.""" + write_config(auth_req) + + monkeypatch.setattr("socket.gethostname", lambda: "TestHost") + + summary_text = None + + def track_get(request, context): + return "" + + def track_put(request, context): + return "" + + def track_post(request, context): + nonlocal summary_text + if "summary" in request.path: + summary_text = request.text + return "" + + requests_mock.register_uri("GET", f"{API_ROOT}admin/keys", text=track_get) + requests_mock.register_uri("PUT", f"{API_ROOT}admin/keys/testhost", text=track_put) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/key", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/name", text=track_post) + requests_mock.register_uri("POST", f"{API_ROOT}admin/keys/testhost/summary", text=track_post) + + ssh_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQC..." + cli("--add-ssh-key", stdin=ssh_key) + + # Verify summary includes version + assert summary_text is not None + assert "TestHost" in summary_text + assert "novem cli v" in summary_text