diff --git a/README.md b/README.md index 73887aa..02710ce 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,40 @@ CPU Features: - asimddp: Advanced SIMD Dot Product - SIMD instructions for dot product operations, useful for machine learning workloads. ``` +## Agentic mode with `--agentic` + +For goals that need **several** shell steps (investigate, then drill down, then summarize), use agentic mode. You describe the outcome you want; the AI suggests one command at a time. After each run, **stdout and stderr** are sent back through the tunnel to your laptop, and the model either proposes the next command or returns a **final answer** when it has enough information. + +This is the same safety model as plain `q`: each command is shown in bold cyan and you confirm with **y** before it runs. + +### Usage + +```bash +q --agentic +``` + +Optional environment variables (read in the **shell where `q` runs**, usually on the board) are listed in [Environment variables](#environment-variables) under `SSHQ_AGENTIC_*`. + +### Example + +```bash +$ q --agentic analyze the process consuming the most CPU over the last 15 minutes + +[agentic step 1/25] +pidstat 1 1 + +Run this command? [y/N] y +(... command output appears here ...) + +[agentic step 2/25] +grep ... + +Run this command? [y/N] y +(...) + +Based on the command output, the process that consumed the most CPU over the last 15 minutes was ... +``` + ## Local / RamaLama You can run inference entirely on your machine using [RamaLama](https://ramalama.ai/) (or any OpenAI-compatible server like Ollama or llama.cpp). No API keys are required. @@ -171,3 +205,5 @@ The local backend uses the same OpenAI `chat/completions` API that RamaLama’s | `GEMINI_API_KEY` | Yes (if neither local nor Groq) | — | Your Gemini API key. | | `SSHQ_GEMINI_MODEL` | No | `gemini-2.5-flash` | Gemini model (e.g. `gemini-2.5-flash-lite` for higher quota). | | `SSHQ_GROQ_MODEL` | No | `llama-3.3-70b-versatile` | Groq model (e.g. `llama-3.1-8b-instant` for faster replies). | +| `SSHQ_AGENTIC_MAX_STEPS` | No | `25` | Maximum suggest/run rounds for `q --agentic` (evaluated on the target shell). | +| `SSHQ_AGENTIC_MAX_OUTPUT_CHARS` | No | `32000` | Per-step cap on captured stdout/stderr sent back to the model (each stream gets half before truncation). | diff --git a/src/sshq/cli.py b/src/sshq/cli.py index 17d2d4a..3809bea 100644 --- a/src/sshq/cli.py +++ b/src/sshq/cli.py @@ -64,11 +64,13 @@ def _stop_ramalama() -> None: import json import urllib.request import subprocess +import os def main(): if len(sys.argv) < 2: print("Usage: q ") print(" q --analyze ") + print(" q --agentic # multi-step commands until the goal is answered") sys.exit(1) # q --analyze @@ -114,6 +116,101 @@ def main(): sys.exit(1) return + # q --agentic -> multi-step agent loop + if len(sys.argv) >= 2 and sys.argv[1] == "--agentic": + goal = " ".join(sys.argv[2:]).strip() + if not goal: + print("Usage: q --agentic ") + sys.exit(1) + + def _truncate_field(text, max_len): + text = text or "" + if len(text) <= max_len: + return text + return text[: max(0, max_len - 40)] + "\\n[... truncated ...]\\n" + + try: + max_steps = int(os.environ.get("SSHQ_AGENTIC_MAX_STEPS", "25")) + except ValueError: + max_steps = 25 + try: + max_chars = int(os.environ.get("SSHQ_AGENTIC_MAX_OUTPUT_CHARS", "32000")) + except ValueError: + max_chars = 32000 + half = max(1000, max_chars // 2) + + history = [] + for step in range(1, max_steps + 1): + payload = json.dumps({{"goal": goal, "history": history}}).encode("utf-8") + req = urllib.request.Request( + "http://localhost:{port}/agentic", + data=payload, + headers={{"Content-Type": "application/json"}}, + ) + try: + with urllib.request.urlopen(req) as response: + result = json.loads(response.read().decode()) + except urllib.error.HTTPError as e: + try: + body = e.read().decode() + res = json.loads(body) + msg = res.get("error", body or e.reason) + except Exception: + msg = e.reason or str(e) + print(f"Error: {{msg}}") + sys.exit(1) + except urllib.error.URLError as e: + print("Error: Tunnel is down. Did you connect using sshq?") + if e.reason: + print(f" ({{e.reason}})") + sys.exit(1) + + if "error" in result and result.get("error"): + print(f"Error: {{result['error']}}") + sys.exit(1) + + action = result.get("action") + if action == "answer": + print() + print(f"✅ {{result.get('answer', '')}}") + return + + if action != "command": + print("Error: Unexpected response from server.") + sys.exit(1) + + command = result.get("command") + if not command: + print("Error: No command returned.") + sys.exit(1) + + print() + print(f"\\033[1;90m[agentic step {{step}}/{{max_steps}}]\\033[0m") + print(f"\\033[1;36m{{command}}\\033[0m") + print() + choice = input("Run this command? [Y/n] ").strip().lower() + if choice == "n": + print("Aborted.") + sys.exit(0) + + print() + completed = subprocess.run(command, shell=True, capture_output=True, text=True) + if completed.stdout: + sys.stdout.write(completed.stdout) + if completed.stderr: + sys.stderr.write(completed.stderr) + out = _truncate_field(completed.stdout, half) + err = _truncate_field(completed.stderr, half) + history.append({{ + "command": command, + "stdout": out, + "stderr": err, + "exit_code": completed.returncode, + }}) + + print(f"Stopped after {{max_steps}} steps (SSHQ_AGENTIC_MAX_STEPS). Increase the limit or narrow the goal.") + sys.exit(1) + # q -> suggest command prompt = " ".join(sys.argv[1:]) data = json.dumps({{"prompt": prompt}}).encode('utf-8') diff --git a/src/sshq/server.py b/src/sshq/server.py index 473cd94..8d2988f 100644 --- a/src/sshq/server.py +++ b/src/sshq/server.py @@ -1,11 +1,78 @@ +import json import logging import re +from typing import Optional, Tuple + import flask.cli from flask import Flask, request, jsonify from .backends import get_backend +def _extract_json_object(text: str) -> Optional[dict]: + """Parse a JSON object from model output, optionally inside a markdown code fence.""" + text = text.strip() + match = re.search(r"```(?:json)?\s*(\{.*\})\s*```", text, re.DOTALL) + if match: + text = match.group(1) + start = text.find("{") + if start < 0: + return None + try: + obj, _end = json.JSONDecoder().raw_decode(text, start) + if isinstance(obj, dict): + return obj + except json.JSONDecodeError: + pass + return None + + +def _format_agentic_history(history: list) -> str: + if not history: + return "No commands have been run yet." + parts = [] + for i, item in enumerate(history, start=1): + cmd = item.get("command", "") + code = item.get("exit_code", "") + out = item.get("stdout", "") or "" + err = item.get("stderr", "") or "" + parts.append( + f"### Step {i}\n" + f"Command: {cmd}\n" + f"Exit code: {code}\n" + f"Stdout:\n{out}\n" + f"Stderr:\n{err}\n" + ) + return "\n".join(parts) + + +def _parse_agentic_response(raw: str) -> Tuple[str, Optional[str], Optional[str]]: + """ + Returns (action, command_or_none, answer_or_none). + action is 'command', 'answer', or 'error'. + """ + obj = _extract_json_object(raw) + if not obj or not isinstance(obj, dict): + return ("error", None, raw.strip() or "Could not parse model response as JSON.") + + typ = (obj.get("type") or "").strip().lower() + if typ == "answer": + ans = obj.get("answer") + if ans is None: + ans = obj.get("text") + if not isinstance(ans, str) or not ans.strip(): + return ("error", None, "JSON type 'answer' but missing non-empty 'answer' string.") + return ("answer", None, ans.strip()) + + if typ == "command": + cmd = obj.get("command") + if not isinstance(cmd, str) or not cmd.strip(): + return ("error", None, "JSON type 'command' but missing non-empty 'command' string.") + return ("command", cmd.strip(), None) + + return ("error", None, f"Unknown or missing JSON 'type': {typ!r}") + + def _extract_command(raw: str) -> str: """Extract a single shell command from model output that may include markdown or explanations.""" text = raw.strip() @@ -79,6 +146,55 @@ def analyze(): return jsonify({"error": str(e)}), 500 +@app.route("/agentic", methods=["POST"]) +def agentic(): + data = request.json + if not data or "goal" not in data: + return jsonify({"error": "goal is required"}), 400 + + goal = (data.get("goal") or "").strip() + if not goal: + return jsonify({"error": "goal must be non-empty"}), 400 + + history = data.get("history") + if history is None: + history = [] + if not isinstance(history, list): + return jsonify({"error": "history must be a list"}), 400 + + system_instruction = ( + "You are an expert embedded Linux engineer helping via a multi-step shell workflow. " + "The user is on a device reached through SSH; you never run commands yourself—you only " + "propose one shell command at a time or give a final answer.\n\n" + "Respond with ONLY one JSON object (no markdown fences, no other text):\n" + '- To gather more data: {"type": "command", "command": ""}\n' + '- When the user\'s goal is fully satisfied from the information available (including ' + "outputs in the history): {\"type\": \"answer\", \"answer\": \"\"}\n\n" + "Prefer non-destructive, read-only commands unless the goal requires changes. " + "Do NOT use sudo unless clearly needed. One command per step; avoid compound pipelines " + "unless necessary. If output was truncated, you may suggest a narrower follow-up command." + ) + + user_message = ( + "## User goal\n" + f"{goal}\n\n" + "## Command history and outputs\n" + f"{_format_agentic_history(history)}\n\n" + "Decide the next single shell command to run, or answer the goal if you already can." + ) + + try: + text = backend(user_message, system_instruction, temperature=0.0, max_tokens=2048) + action, command, answer = _parse_agentic_response(text) + if action == "error": + return jsonify({"error": answer or "Invalid agentic response"}), 500 + if action == "answer": + return jsonify({"action": "answer", "answer": answer}) + return jsonify({"action": "command", "command": command}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + def start_server(port): global backend backend = get_backend() diff --git a/tests/test_cli.py b/tests/test_cli.py index 3609fe9..8e7ebbb 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -5,6 +5,8 @@ import pytest +from sshq.cli import Q_SCRIPT + def run_main(argv, env=None, prog="sshq", clear_env=False): """Run cli.main with given argv and env; return (exit_code, stdout, stderr).""" @@ -51,6 +53,10 @@ def test_version_exits_zero_and_prints_version(argv): assert err == "" +def test_q_script_format_is_valid_python(): + compile(Q_SCRIPT.format(port=12345), "", "exec") + + def test_missing_both_api_keys_exits_nonzero_and_prints_to_stderr(): env = { k: v diff --git a/tests/test_server.py b/tests/test_server.py index 02a7f03..31854c6 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -99,3 +99,77 @@ def test_analyze_on_api_error_returns_500(client, mock_backend): assert r.status_code == 500 assert "error" in r.json assert "API error" in r.json["error"] + + +# --- /agentic --- + + +def test_agentic_without_goal_returns_400(client): + r = client.post("/agentic", json={}) + assert r.status_code == 400 + assert "goal" in r.json["error"].lower() + + r = client.post("/agentic", json={"goal": " "}) + assert r.status_code == 400 + + +def test_agentic_invalid_history_returns_400(client): + r = client.post("/agentic", json={"goal": "check disk", "history": "not-a-list"}) + assert r.status_code == 400 + + +def test_agentic_returns_command(client, mock_backend): + mock_backend.return_value = '{"type": "command", "command": "df -h"}' + + r = client.post("/agentic", json={"goal": "show disk usage", "history": []}) + assert r.status_code == 200 + assert r.json == {"action": "command", "command": "df -h"} + mock_backend.assert_called_once() + + +def test_agentic_returns_answer(client, mock_backend): + mock_backend.return_value = '{"type": "answer", "answer": "The busiest CPU was process foo."}' + + r = client.post( + "/agentic", + json={ + "goal": "who used the CPU?", + "history": [ + { + "command": "ps aux", + "stdout": "foo 99%", + "stderr": "", + "exit_code": 0, + } + ], + }, + ) + assert r.status_code == 200 + assert r.json == { + "action": "answer", + "answer": "The busiest CPU was process foo.", + } + + +def test_agentic_accepts_json_in_markdown_fence(client, mock_backend): + mock_backend.return_value = '```json\n{"type": "command", "command": "uptime"}\n```' + + r = client.post("/agentic", json={"goal": "load average"}) + assert r.status_code == 200 + assert r.json == {"action": "command", "command": "uptime"} + + +def test_agentic_on_parse_error_returns_500(client, mock_backend): + mock_backend.return_value = "not json" + + r = client.post("/agentic", json={"goal": "x"}) + assert r.status_code == 500 + assert "error" in r.json + + +def test_agentic_on_api_error_returns_500(client, mock_backend): + mock_backend.side_effect = RuntimeError("API error") + + r = client.post("/agentic", json={"goal": "x"}) + assert r.status_code == 500 + assert "API error" in r.json["error"]