diff --git a/e2e/tests/12-event-driven-agent/Dockerfile b/e2e/tests/12-event-driven-agent/Dockerfile new file mode 100644 index 0000000..eb3b4e5 --- /dev/null +++ b/e2e/tests/12-event-driven-agent/Dockerfile @@ -0,0 +1,19 @@ +FROM docker.io/library/python:3.13-slim + +WORKDIR /app +ENV PYTHONPATH=/app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* \ + && pip install --no-cache-dir nats-py==2.9.0 + +COPY e2e/tests/workspace_common /app/workspace_common +COPY e2e/tests/12-event-driven-agent/test_event_driven_agent.py /app/test_event_driven_agent.py + +RUN groupadd -r testuser && useradd -r -m -g testuser -u 1000 testuser && \ + chown -R testuser:testuser /app + +USER testuser + +CMD ["python", "/app/test_event_driven_agent.py"] diff --git a/e2e/tests/12-event-driven-agent/Makefile b/e2e/tests/12-event-driven-agent/Makefile new file mode 100644 index 0000000..42b7594 --- /dev/null +++ b/e2e/tests/12-event-driven-agent/Makefile @@ -0,0 +1,52 @@ +REGISTRY ?= ghcr.io/underpass-ai/underpass-runtime +IMAGE_NAME = e2e-event-driven-agent +VERSION ?= v1.0.0 +NAMESPACE ?= underpass-runtime +IMAGE = $(REGISTRY)/$(IMAGE_NAME):$(VERSION) +LATEST = $(REGISTRY)/$(IMAGE_NAME):latest + +BUILDER := $(shell command -v podman 2>/dev/null || command -v docker 2>/dev/null) + +.PHONY: build push build-push deploy status logs delete help + +build: + @echo "Building $(IMAGE)..." + cd ../../../ && $(BUILDER) build \ + -f e2e/tests/12-event-driven-agent/Dockerfile \ + -t $(IMAGE) \ + -t $(LATEST) \ + . + @echo "Built $(IMAGE)" + +push: + @echo "Pushing $(IMAGE)..." + $(BUILDER) push $(IMAGE) + $(BUILDER) push $(LATEST) + @echo "Pushed $(IMAGE)" + +build-push: build push + +deploy: + @echo "Deploying E2E test job..." + kubectl apply -f job.yaml + @echo "Job deployed" + +status: + kubectl get job -n $(NAMESPACE) e2e-event-driven-agent || echo "Job not found" + kubectl get pods -n $(NAMESPACE) -l app=e2e-event-driven-agent || echo "No pods found" + +logs: + kubectl logs -n $(NAMESPACE) -l app=e2e-event-driven-agent --tail=100 -f + +delete: + kubectl delete job -n $(NAMESPACE) e2e-event-driven-agent || true + +help: + @echo "Available targets:" + @echo " build - Build the container image" + @echo " push - Push the image to registry" + @echo " build-push - Build and push" + @echo " deploy - Deploy the test job" + @echo " status - Check job status" + @echo " logs - View test logs" + @echo " delete - Delete test job" diff --git a/e2e/tests/12-event-driven-agent/job.yaml b/e2e/tests/12-event-driven-agent/job.yaml new file mode 100644 index 0000000..092a988 --- /dev/null +++ b/e2e/tests/12-event-driven-agent/job.yaml @@ -0,0 +1,75 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: e2e-event-driven-agent + namespace: underpass-runtime + labels: + app: e2e-event-driven-agent + test-type: e2e + test-suite: workspace +spec: + backoffLimit: 1 + ttlSecondsAfterFinished: 7200 + activeDeadlineSeconds: 300 + + template: + metadata: + labels: + app: e2e-event-driven-agent + test-type: e2e + spec: + restartPolicy: Never + automountServiceAccountToken: false + imagePullSecrets: + - name: ghcr-pull + volumes: + - name: ca-cert + configMap: + name: runtime-ca + + containers: + - name: test-runner + image: ghcr.io/underpass-ai/underpass-runtime/e2e-event-driven-agent:v1.0.0 + imagePullPolicy: Always + volumeMounts: + - name: ca-cert + mountPath: /etc/ssl/runtime + readOnly: true + env: + - name: WORKSPACE_URL + value: "https://underpass-runtime.underpass-runtime.svc.cluster.local:50053" + - name: NATS_URL + value: "nats://nats.underpass-runtime.svc.cluster.local:4222" + - name: SSL_CERT_FILE + value: "/etc/ssl/runtime/ca.crt" + - name: REQUESTS_CA_BUNDLE + value: "/etc/ssl/runtime/ca.crt" + - name: EVIDENCE_FILE + value: "/tmp/evidence-12.json" + - name: PYTHONUNBUFFERED + value: "1" + - name: E2E_DEBUG + value: "1" + - name: E2E_SKIP_CLEANUP + value: "0" + resources: + requests: + cpu: "100m" + memory: "128Mi" + ephemeral-storage: "64Mi" + limits: + cpu: "500m" + memory: "256Mi" + ephemeral-storage: "128Mi" + securityContext: + runAsNonRoot: true + runAsUser: 1000 + allowPrivilegeEscalation: false + readOnlyRootFilesystem: false + capabilities: + drop: + - ALL + seccompProfile: + type: RuntimeDefault + + dnsPolicy: ClusterFirst diff --git a/e2e/tests/12-event-driven-agent/test_event_driven_agent.py b/e2e/tests/12-event-driven-agent/test_event_driven_agent.py new file mode 100644 index 0000000..215f985 --- /dev/null +++ b/e2e/tests/12-event-driven-agent/test_event_driven_agent.py @@ -0,0 +1,375 @@ +"""E2E test: Event-Driven Agent — validates the full reactive loop: + + NATS event → Agent wakes up → Runtime session (HTTPS) → + Tool discovery → Tool execution → Result event on NATS + +Use case: A code-review agent receives a task.assigned event containing +a pull request with a Go file that has a known bug (missing error check). +The agent: + 1. Subscribes to NATS and waits for the event + 2. Creates a session on the runtime (over TLS) + 3. Discovers available tools + 4. Writes the source code to the workspace (fs.write_file) + 5. Reads it back for analysis (fs.read_file) + 6. Writes a review report with the finding (fs.write_file) + 7. Reads the report to confirm (fs.read_file) + 8. Publishes a task.completed event back to NATS with evidence + 9. Closes the session + +This proves: event-driven activation, TLS transport, governed tool +execution, and event completion — the core underpass-ai loop. +""" + +from __future__ import annotations + +import asyncio +import json +import os +import sys +import time +import uuid + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from workspace_common import WorkspaceE2EBase, print_error, print_info, print_step, print_success, print_warning + +# --------------------------------------------------------------------------- +# The "PR" payload — Go code with a real bug (unchecked error) +# --------------------------------------------------------------------------- + +SOURCE_CODE = '''\ +package handlers + +import ( +\t"encoding/json" +\t"net/http" +\t"os" +) + +type Config struct { +\tDBHost string `json:"db_host"` +\tDBPort int `json:"db_port"` +\tSecret string `json:"secret"` +} + +// LoadConfig reads config from disk. BUG: error from ReadFile is ignored. +func LoadConfig(path string) Config { +\tdata, _ := os.ReadFile(path) +\tvar cfg Config +\tjson.Unmarshal(data, &cfg) +\treturn cfg +} + +// HealthHandler responds with service status. +func HealthHandler(w http.ResponseWriter, r *http.Request) { +\tcfg := LoadConfig("/etc/app/config.json") +\tif cfg.DBHost == "" { +\t\tw.WriteHeader(http.StatusServiceUnavailable) +\t\tw.Write([]byte(`{"status":"degraded","reason":"no db_host"}`)) +\t\treturn +\t} +\tw.WriteHeader(http.StatusOK) +\tw.Write([]byte(`{"status":"ok"}`)) +} +''' + +EXPECTED_REVIEW = '''\ +## Code Review Report + +**File**: handlers.go +**Agent**: code-review-agent +**Severity**: HIGH + +### Findings + +1. **Unchecked error in LoadConfig (line 19)** + `data, _ := os.ReadFile(path)` — the error return from `os.ReadFile` + is discarded. If the config file is missing or unreadable, `data` will + be nil, `json.Unmarshal` will silently fail, and `LoadConfig` returns + a zero-value Config. The HealthHandler then reports "degraded" with no + indication that the real problem is a missing config file, not a missing + DB host. + + **Fix**: Return the error and handle it in the caller. + ```go + func LoadConfig(path string) (Config, error) { + data, err := os.ReadFile(path) + if err != nil { + return Config{}, fmt.Errorf("read config %s: %w", path, err) + } + var cfg Config + if err := json.Unmarshal(data, &cfg); err != nil { + return Config{}, fmt.Errorf("parse config: %w", err) + } + return cfg, nil + } + ``` + +2. **Unchecked error in json.Unmarshal (line 20)** + Same pattern — malformed JSON is silently ignored. + +3. **Unchecked error in w.Write (lines 30, 33)** + Minor — http.ResponseWriter.Write errors are typically ignored, + but logging them helps debug network issues. + +### Verdict +**REQUEST CHANGES** — the unchecked ReadFile error is a production risk. +''' + + +class EventDrivenAgentE2E(WorkspaceE2EBase): + """Event-driven agent E2E — NATS trigger → Runtime tools → NATS result.""" + + def __init__(self) -> None: + super().__init__( + test_id="12-event-driven-agent", + run_id_prefix="e2e-event-agent", + workspace_url=os.getenv("WORKSPACE_URL", "https://underpass-runtime:50053"), + evidence_file=os.getenv("EVIDENCE_FILE", "/tmp/evidence-12.json"), + ) + self.nats_url = os.getenv("NATS_URL", "nats://nats:4222") + self.task_subject = "workspace.task.assigned" + self.result_subject = "workspace.task.completed" + self.task_id = f"pr-review-{uuid.uuid4().hex[:8]}" + + def run(self) -> int: + final_status = "failed" + error_message = "" + try: + import nats as nats_py + import asyncio + return asyncio.run(self._run_async(nats_py)) + except ImportError: + # Fallback: run without NATS pub/sub, just prove the tools work over TLS + print_warning("nats-py not installed, running agent loop without NATS pub/sub") + return self._run_sync() + except Exception as exc: + error_message = str(exc) + print_error(f"Test failed: {error_message}") + self.write_evidence("failed", error_message) + return 1 + + async def _run_async(self, nats_py) -> int: + """Full async flow: NATS subscribe → agent work → NATS publish.""" + nc = await nats_py.connect(self.nats_url) + result_received = asyncio.Event() + result_data = {} + + # --- Step 1: Subscribe to result subject --- + print_step(1, f"Subscribing to {self.result_subject}") + async def on_result(msg): + nonlocal result_data + result_data = json.loads(msg.data.decode()) + result_received.set() + + sub = await nc.subscribe(self.result_subject, cb=on_result) + self.record_step("nats_subscribe_result", "passed") + print_success(f"Subscribed to {self.result_subject}") + + # --- Step 2: Publish task event --- + print_step(2, f"Publishing task event to {self.task_subject}") + task_event = { + "task_id": self.task_id, + "type": "pull_request.review", + "repo": "underpass-ai/billing-api", + "branch": "fix/config-handler", + "file": "handlers.go", + "description": "Fix health endpoint config loading", + "assigned_to": "code-review-agent", + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + } + await nc.publish(self.task_subject, json.dumps(task_event).encode()) + await nc.flush() + self.record_step("nats_publish_task", "passed", {"task_id": self.task_id}) + print_success(f"Published task {self.task_id}") + + # --- Step 3-8: Agent work (same as sync) --- + agent_result = self._execute_agent_work() + + # --- Step 9: Publish completion event --- + print_step(9, f"Publishing completion to {self.result_subject}") + completion = { + "task_id": self.task_id, + "type": "pull_request.reviewed", + "agent": "code-review-agent", + "session_id": agent_result["session_id"], + "verdict": "request_changes", + "findings": 3, + "severity": "HIGH", + "files_analyzed": 1, + "tools_used": agent_result["tools_used"], + "invocations": agent_result["invocation_count"], + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + } + await nc.publish(self.result_subject, json.dumps(completion).encode()) + await nc.flush() + self.record_step("nats_publish_result", "passed", completion) + print_success(f"Published completion for {self.task_id}") + + # --- Step 10: Verify round-trip --- + print_step(10, "Verifying NATS round-trip") + try: + await asyncio.wait_for(result_received.wait(), timeout=5.0) + if result_data.get("task_id") == self.task_id: + self.record_step("nats_roundtrip", "passed", result_data) + print_success(f"NATS round-trip verified: task_id={self.task_id}") + else: + self.record_step("nats_roundtrip", "warning", {"expected": self.task_id, "got": result_data}) + print_warning("NATS round-trip: task_id mismatch") + except asyncio.TimeoutError: + self.record_step("nats_roundtrip", "warning", {"reason": "timeout"}) + print_warning("NATS round-trip: timeout (sub may have received before publish)") + + await sub.unsubscribe() + await nc.close() + + self.write_evidence("passed") + print_success(f"Event-Driven Agent E2E PASSED — {agent_result['invocation_count']} invocations over TLS") + return 0 + + def _run_sync(self) -> int: + """Sync fallback: agent work without NATS.""" + print_step(1, "NATS not available — running agent tools directly") + self.record_step("nats_skip", "warning", {"reason": "nats-py not installed"}) + + agent_result = self._execute_agent_work() + + self.write_evidence("passed") + print_success(f"Event-Driven Agent E2E PASSED (sync mode) — {agent_result['invocation_count']} invocations over TLS") + return 0 + + def _execute_agent_work(self) -> dict: + """Core agent logic: session → discover → write code → review → report.""" + tools_used = [] + invocation_count = 0 + + # --- Step 3: Create session --- + print_step(3, "Creating workspace session") + session_id = self.create_session( + payload={ + "principal": { + "tenant_id": "underpass-ai", + "actor_id": "code-review-agent", + "roles": ["reviewer"], + }, + "metadata": { + "task_id": self.task_id, + "repo": "underpass-ai/billing-api", + "runner_profile": "base", + }, + }, + ) + self.record_step("create_session", "passed", {"session_id": session_id}) + print_success(f"Session: {session_id}") + + # --- Step 4: Discover tools --- + print_step(4, "Discovering available tools") + status, body = self.request("GET", f"/v1/sessions/{session_id}/tools/discovery?detail=compact") + if status != 200: + raise RuntimeError(f"discovery failed: {status}") + all_tools = body.get("tools", []) + tool_names = [t["name"] for t in all_tools] + self.record_step("discover_tools", "passed", {"count": len(all_tools)}) + print_success(f"Discovered {len(all_tools)} tools") + + # --- Step 5: Write source code (simulate PR checkout) --- + print_step(5, "Writing source code to workspace (simulating PR checkout)") + status, body, inv = self.invoke( + session_id=session_id, tool_name="fs.write_file", + args={"path": "handlers.go", "content": SOURCE_CODE}, + approved=True, + ) + self._check_invocation("fs.write_file", status, inv) + tools_used.append("fs.write_file") + invocation_count += 1 + sha_source = inv.get("output", {}).get("sha256", "?") + print_success(f"handlers.go written ({len(SOURCE_CODE)} bytes, sha256={sha_source[:16]}...)") + + # --- Step 6: Read file back (agent analyzes code) --- + print_step(6, "Reading source code for analysis") + status, body, inv = self.invoke( + session_id=session_id, tool_name="fs.read_file", + args={"path": "handlers.go"}, + ) + self._check_invocation("fs.read_file", status, inv) + tools_used.append("fs.read_file") + invocation_count += 1 + content = inv.get("output", {}).get("content", "") + has_bug = "data, _ := os.ReadFile" in content + self.record_step("analyze_code", "passed", {"has_bug": has_bug, "size": len(content)}) + print_success(f"Code analyzed: {len(content)} bytes, unchecked-error bug={'found' if has_bug else 'NOT found'}") + + # --- Step 7: Write review report --- + print_step(7, "Writing code review report") + status, body, inv = self.invoke( + session_id=session_id, tool_name="fs.write_file", + args={"path": "REVIEW.md", "content": EXPECTED_REVIEW}, + approved=True, + ) + self._check_invocation("fs.write_file", status, inv) + tools_used.append("fs.write_file") + invocation_count += 1 + sha_review = inv.get("output", {}).get("sha256", "?") + print_success(f"REVIEW.md written ({len(EXPECTED_REVIEW)} bytes, sha256={sha_review[:16]}...)") + + # --- Step 8: Read report and verify workspace --- + print_step(8, "Verifying workspace state") + status, body, inv = self.invoke( + session_id=session_id, tool_name="fs.list", args={"path": "."}, + ) + self._check_invocation("fs.list", status, inv) + tools_used.append("fs.list") + invocation_count += 1 + entries = inv.get("output", {}).get("entries", []) + file_names = [e.get("path", "") for e in entries] if isinstance(entries, list) else [] + has_source = "handlers.go" in file_names + has_review = "REVIEW.md" in file_names + self.record_step("verify_workspace", "passed", { + "files": file_names, + "has_source": has_source, + "has_review": has_review, + }) + print_success(f"Workspace: {len(file_names)} files — handlers.go={'yes' if has_source else 'NO'}, REVIEW.md={'yes' if has_review else 'NO'}") + + # Read back review to confirm content + status, body, inv = self.invoke( + session_id=session_id, tool_name="fs.read_file", args={"path": "REVIEW.md"}, + ) + self._check_invocation("fs.read_file", status, inv) + tools_used.append("fs.read_file") + invocation_count += 1 + review_content = inv.get("output", {}).get("content", "") + has_verdict = "REQUEST CHANGES" in review_content + self.record_step("verify_review", "passed", {"has_verdict": has_verdict, "size": len(review_content)}) + print_success(f"Review verified: {len(review_content)} bytes, verdict={'REQUEST CHANGES' if has_verdict else 'MISSING'}") + + # Close session + self.request("DELETE", f"/v1/sessions/{session_id}") + + return { + "session_id": session_id, + "tools_used": list(set(tools_used)), + "invocation_count": invocation_count, + "has_source": has_source, + "has_review": has_review, + "has_verdict": has_verdict, + } + + def _check_invocation(self, tool: str, status: int, inv: dict | None): + """Validate an invocation succeeded.""" + if status != 200: + raise RuntimeError(f"{tool} HTTP {status}") + if not inv or inv.get("status") != "succeeded": + raise RuntimeError(f"{tool} invocation failed: {inv}") + self.record_step(f"invoke_{tool}", "passed", { + "invocation_id": inv.get("id", "?"), + "status": inv.get("status"), + }) + + +def main() -> int: + return EventDrivenAgentE2E().run() + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/e2e/tests/e2e_tests.yaml b/e2e/tests/e2e_tests.yaml index 9637aa7..1fe2785 100644 --- a/e2e/tests/e2e_tests.yaml +++ b/e2e/tests/e2e_tests.yaml @@ -78,3 +78,11 @@ workspace_tests: kind: "tool-learning" timeout_override: 600 tags: ["tool-learning", "pipeline", "minio", "valkey", "duckdb", "e2e"] + + - id: "12" + name: "12-event-driven-agent" + job_name: "e2e-event-driven-agent" + tier: "full" + kind: "agent" + timeout_override: 300 + tags: ["agent", "event-driven", "nats", "tls", "tools", "code-review"]