-
Notifications
You must be signed in to change notification settings - Fork 0
Feat/basilica #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c19638e
fa0b555
5ce99b5
ef20689
3372a07
496c43e
0853bd6
61d2eea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| """Basilica Sandbox Manager - adapts basilica SDK to ridges interface.""" | ||
|
|
||
| import os | ||
| import json | ||
| import shutil | ||
| import tempfile | ||
| from typing import Any, Dict, Callable, Optional | ||
| from dataclasses import dataclass | ||
|
|
||
| import basilica | ||
| from basilica import Sandbox, python_sandbox | ||
| from evaluator.models import SandboxResultWithLogs | ||
|
|
||
|
|
||
| @dataclass | ||
| class SandboxHandle: | ||
| name: str | ||
| sandbox: Sandbox | ||
| script_name: str | ||
| timeout: int = 3600 | ||
|
|
||
|
|
||
| class BasilicaSandboxManager: | ||
| """Adapts basilica.Sandbox to ridges SandboxManager interface.""" | ||
|
|
||
| def __init__(self, inference_gateway_url: str = None): | ||
| api_url = os.environ.get("BASILICA_API_URL", "http://localhost:9080") | ||
| api_token = os.environ.get("BASILICA_API_TOKEN") | ||
| if not api_token: | ||
| raise ValueError("BASILICA_API_TOKEN required") | ||
| basilica.configure(api_url=api_url, api_key=api_token) | ||
|
|
||
| def initialize_sandbox( | ||
| self, *, name: str, script_path: str, input_data: Any = None, | ||
| env_vars: Dict[str, str] = {}, on_mount: Callable[[str], None] = None, | ||
| timeout_seconds: int = None | ||
| ) -> SandboxHandle: | ||
| script_name = os.path.basename(script_path) | ||
| sandbox = python_sandbox(runtime="container", env={**env_vars, "PYTHONUNBUFFERED": "1"}, | ||
| timeout_seconds=timeout_seconds or 3600) | ||
|
|
||
| # Upload files from on_mount callback | ||
| if on_mount: | ||
| tmp = tempfile.mkdtemp() | ||
| on_mount(tmp) | ||
| for root, _, files in os.walk(tmp): | ||
| for f in files: | ||
| local = os.path.join(root, f) | ||
| try: | ||
| sandbox.files.write(f"/sandbox/{os.path.relpath(local, tmp)}", open(local).read()) | ||
| except (UnicodeDecodeError, IOError): | ||
| pass | ||
| shutil.rmtree(tmp, ignore_errors=True) | ||
|
|
||
| sandbox.files.write(f"/sandbox/{script_name}", open(script_path).read()) | ||
| if input_data is not None: | ||
| sandbox.files.write("/sandbox/input.json", json.dumps(input_data, indent=2)) | ||
|
|
||
| return SandboxHandle(name, sandbox, script_name, timeout_seconds or 3600) | ||
|
|
||
| def run_sandbox(self, handle: SandboxHandle) -> SandboxResultWithLogs: | ||
| try: | ||
| cmd = ["python3", f"/sandbox/{handle.script_name}"] | ||
| result = handle.sandbox.process.exec(cmd, timeout=handle.timeout) | ||
| logs = result.stdout + ("\n[STDERR]\n" + result.stderr if result.stderr else "") | ||
|
|
||
| try: | ||
| output = json.loads(handle.sandbox.files.read("/sandbox/output.json")) | ||
| except: | ||
| output = {"success": False, "error": "Failed to read output.json"} | ||
|
Comment on lines
+67
to
+70
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid bare Bare 🐛 Proposed fix try:
output = json.loads(handle.sandbox.files.read("/sandbox/output.json"))
- except:
+ except (json.JSONDecodeError, FileNotFoundError, Exception) as e:
output = {"success": False, "error": "Failed to read output.json"}
+ # Consider logging e for debugging🧰 Tools🪛 Ruff (0.14.10)69-69: Do not use bare (E722) |
||
|
|
||
| return SandboxResultWithLogs( | ||
| success=output.get("success", False), output=output.get("output"), | ||
| error=output.get("error"), traceback=output.get("traceback"), logs=logs | ||
| ) | ||
| finally: | ||
| try: handle.sandbox.delete() | ||
| except: pass | ||
|
|
||
|
|
||
| def get_sandbox_manager(inference_gateway_url: str = None, backend: str = None): | ||
| """Factory: returns BasilicaSandboxManager or SandboxManager.""" | ||
| backend = backend or os.environ.get("RIDGES_SANDBOX_BACKEND", "docker") | ||
| if backend == "basilica": | ||
| return BasilicaSandboxManager(inference_gateway_url) | ||
| from evaluator.sandbox.sandbox_manager import SandboxManager | ||
| return SandboxManager(inference_gateway_url) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,165 @@ | ||
| #!/usr/bin/env python3 | ||
| """ | ||
| Basilica Sandbox Integration Tests | ||
|
|
||
| Setup: | ||
| cd ../basilica-backend && sudo ./scripts/sandbox/local-container-e2e.sh setup | ||
| export BASILICA_API_URL=http://localhost:9080 BASILICA_API_TOKEN=dev-token | ||
|
|
||
| Usage: | ||
| python test_basilica_sandbox.py # Quick check | ||
| python test_basilica_sandbox.py --full # + concurrent evals | ||
| python test_basilica_sandbox.py --scale 30 # 30 concurrent evals | ||
| """ | ||
|
|
||
| import os, sys, time, json, tempfile, traceback, concurrent.futures | ||
| from uuid import uuid4 | ||
| from pathlib import Path | ||
| from threading import Lock | ||
|
|
||
| sys.path.insert(0, str(Path(__file__).parent)) | ||
|
|
||
| import click | ||
| import basilica | ||
| from basilica import python_sandbox | ||
| from evaluator.sandbox.basilica_sandbox_manager import BasilicaSandboxManager | ||
| from evaluator.problem_suites.polyglot.polyglot_suite import POLYGLOT_PY_SUITE | ||
| from models.problem import ProblemTestResultStatus | ||
|
|
||
| basilica.configure( | ||
| api_url=os.environ.get("BASILICA_API_URL", "http://localhost:9080"), | ||
| api_key=os.environ.get("BASILICA_API_TOKEN", ""), | ||
| ) | ||
|
Comment on lines
+29
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Module-level If 🐛 Option: Move configuration inside test functions or guard it-basilica.configure(
- api_url=os.environ.get("BASILICA_API_URL", "http://localhost:9080"),
- api_key=os.environ.get("BASILICA_API_TOKEN", ""),
-)
+def _configure_basilica():
+ api_key = os.environ.get("BASILICA_API_TOKEN")
+ if not api_key:
+ raise ValueError("BASILICA_API_TOKEN required")
+ basilica.configure(
+ api_url=os.environ.get("BASILICA_API_URL", "http://localhost:9080"),
+ api_key=api_key,
+ )Then call 🤖 Prompt for AI Agents |
||
|
|
||
| ok = lambda m: print(f" ✓ {m}") | ||
| fail = lambda m: print(f" ✗ {m}") | ||
| section = lambda t: print(f"\n{'─'*60}\n {t}\n{'─'*60}") | ||
|
|
||
|
|
||
| def test_sdk(): | ||
| """Test SDK basics.""" | ||
| section("SDK Connection") | ||
| try: | ||
| with python_sandbox(runtime="container") as sb: | ||
| ok(f"Created: {sb.sandbox_id}") | ||
| assert sb.process.run("print('Hello')").exit_code == 0 | ||
| ok("Code execution") | ||
| sb.files.write("test.txt", "x") | ||
| assert sb.files.read("test.txt") == "x" | ||
| ok("File I/O") | ||
| ok("Cleanup") | ||
| return True | ||
| except Exception as e: | ||
| fail(str(e)); traceback.print_exc(); return False | ||
|
|
||
|
|
||
| def test_manager(): | ||
| """Test BasilicaSandboxManager.""" | ||
| section("SandboxManager") | ||
| script = 'import json; d=json.load(open("/sandbox/input.json")); json.dump({"success":True,"output":d["x"]*2},open("/sandbox/output.json","w"))' | ||
|
|
||
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: | ||
| f.write(script); path = f.name | ||
| try: | ||
| mgr = BasilicaSandboxManager() | ||
| h = mgr.initialize_sandbox(name="t", script_path=path, input_data={"x": 21}, timeout_seconds=60) | ||
| ok(f"Initialized: {h.sandbox.sandbox_id}") | ||
| r = mgr.run_sandbox(h) | ||
| assert r.success and r.output == 42 | ||
| ok(f"Result: {r.output}") | ||
| return True | ||
| except Exception as e: | ||
| fail(str(e)); traceback.print_exc(); return False | ||
| finally: | ||
| os.unlink(path) | ||
|
|
||
|
|
||
| def test_polyglot(problem: str = "accumulate-py"): | ||
| """Test Polyglot evaluation.""" | ||
| section(f"Polyglot: {problem}") | ||
| if not POLYGLOT_PY_SUITE.has_problem_name(problem): | ||
| fail(f"Problem '{problem}' not found"); return False | ||
| try: | ||
| p = POLYGLOT_PY_SUITE.get_problem(problem) | ||
| mgr = BasilicaSandboxManager() | ||
| sb = POLYGLOT_PY_SUITE.initialize_eval_sandbox(mgr, p, uuid4(), p.solution_diff, timeout_seconds=120) | ||
| ok(f"Sandbox: {sb.sandbox.sandbox_id}") | ||
| results, _ = POLYGLOT_PY_SUITE.run_eval_sandbox(mgr, sb) | ||
| passed = sum(1 for t in results if t.status == ProblemTestResultStatus.PASS) | ||
| failed = sum(1 for t in results if t.status == ProblemTestResultStatus.FAIL) | ||
| for t in results: | ||
| print(f" {'✓' if t.status == ProblemTestResultStatus.PASS else '✗'} {t.name}") | ||
| ok(f"Tests: {passed} passed, {failed} failed") | ||
| return failed == 0 | ||
| except Exception as e: | ||
| fail(str(e)); traceback.print_exc(); return False | ||
|
|
||
|
|
||
| def test_concurrent(count: int = 10, verbose: bool = True): | ||
| """Test concurrent evaluations.""" | ||
| section(f"Concurrent: {count}") | ||
| problems = [POLYGLOT_PY_SUITE.get_problem(list(POLYGLOT_PY_SUITE.problems.keys())[i % len(POLYGLOT_PY_SUITE.problems)]) for i in range(count)] | ||
| stats = {"ok": 0, "fail": 0, "times": [], "tests": []} | ||
| lock, t0 = Lock(), time.time() | ||
|
|
||
| def run(i): | ||
| p, start = problems[i], time.time() | ||
| try: | ||
| mgr = BasilicaSandboxManager() | ||
| if verbose: | ||
| with lock: print(f" [{time.time()-t0:5.1f}s] #{i:02d} {p.name}: init...") | ||
| sb = POLYGLOT_PY_SUITE.initialize_eval_sandbox(mgr, p, uuid4(), p.solution_diff, timeout_seconds=180) | ||
| if verbose: | ||
| with lock: print(f" [{time.time()-t0:5.1f}s] #{i:02d} {p.name}: run...") | ||
| results, _ = POLYGLOT_PY_SUITE.run_eval_sandbox(mgr, sb) | ||
| passed = sum(1 for t in results if t.status == ProblemTestResultStatus.PASS) | ||
| failed = sum(1 for t in results if t.status == ProblemTestResultStatus.FAIL) | ||
| success = failed == 0 and passed > 0 | ||
| with lock: | ||
| stats["ok" if success else "fail"] += 1 | ||
| stats["times"].append(time.time() - start) | ||
| stats["tests"].append({"p": passed, "f": failed}) | ||
| if verbose: print(f" [{time.time()-t0:5.1f}s] #{i:02d} {p.name}: {'✓' if success else '✗'} {passed}/{passed+failed} ({time.time()-start:.1f}s)") | ||
| except Exception as e: | ||
| with lock: | ||
| stats["fail"] += 1; stats["times"].append(time.time() - start) | ||
| if verbose: print(f" [{time.time()-t0:5.1f}s] #{i:02d} {p.name}: ✗ {str(e)[:40]}") | ||
|
|
||
| print(f"\n Running {count} evals...\n") | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=min(count, 30)) as ex: | ||
| list(ex.map(run, range(count))) | ||
|
|
||
| total = time.time() - t0 | ||
| print(f"\n {'─'*50}") | ||
| print(f" Evals: {stats['ok']}/{count} | Tests: {sum(t['p'] for t in stats['tests'])} passed, {sum(t['f'] for t in stats['tests'])} failed") | ||
| print(f" Time: {total:.1f}s total, {sum(stats['times'])/len(stats['times']):.1f}s avg, {count/total:.2f}/sec") | ||
| rate = stats["ok"] / count * 100 if count else 0 | ||
| (ok if rate >= 80 else fail)(f"{rate:.0f}% success") | ||
| return rate >= 80 | ||
|
|
||
|
|
||
| @click.command() | ||
| @click.option("--full", is_flag=True, help="Include concurrent evals") | ||
| @click.option("--scale", default=0, type=int, help="N concurrent evals") | ||
| @click.option("--problem", default="accumulate-py", help="Problem name") | ||
| @click.option("--quiet", is_flag=True, help="Less output") | ||
| def main(full, scale, problem, quiet): | ||
| print("\n" + "=" * 60 + "\n Basilica Sandbox Tests\n" + "=" * 60) | ||
| if not os.environ.get("BASILICA_API_TOKEN"): | ||
| fail("BASILICA_API_TOKEN not set"); sys.exit(1) | ||
|
|
||
| tests = [("SDK", test_sdk), ("Manager", test_manager), (f"Polyglot({problem})", lambda: test_polyglot(problem))] | ||
| if full or scale > 0: | ||
| tests.append((f"Concurrent({scale or 10})", lambda: test_concurrent(scale or 10, not quiet))) | ||
|
|
||
| results = [(n, f()) for n, f in tests] | ||
| print("\n" + "=" * 60) | ||
| for n, p in results: print(f" {'✓' if p else '✗'} {n}") | ||
| print("=" * 60) | ||
| passed = all(p for _, p in results) | ||
| print(f" {'✓ All passed' if passed else '✗ Some failed'}") | ||
| sys.exit(0 if passed else 1) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silent failure on binary/unreadable files may hide issues.
Silently passing on
UnicodeDecodeError/IOErrormeans binary files or files with encoding issues won't be mounted, and users won't know. Consider logging a warning or supporting binary file uploads.🔧 Option: Log skipped files
📝 Committable suggestion
🤖 Prompt for AI Agents