Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion components/ambient-api-server/pkg/rbac/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *DBAuthorizationMiddleware) AuthorizeApi(next http.Handler) http.Handler
}

func (m *DBAuthorizationMiddleware) isAllowed(g *gorm.DB, username, method, path string) (bool, error) {
action := httpMethodToAction(method)
action := pathToAction(method, path)
resource := pathToResource(path)

var bindings []roleBindingRow
Expand Down Expand Up @@ -130,6 +130,22 @@ func httpMethodToAction(method string) string {
}
}

func pathToAction(method, path string) string {
parts := strings.Split(strings.TrimPrefix(path, "/"), "/")
for i, p := range parts {
if p == "v1" && i+2 < len(parts) {
last := parts[len(parts)-1]
switch last {
case "token":
return "token"
case "start", "stop", "ignite", "ignition":
return last
}
}
}
return httpMethodToAction(method)
}

func pathToResource(path string) string {
parts := strings.Split(strings.TrimPrefix(path, "/"), "/")
for i, p := range parts {
Expand Down
47 changes: 42 additions & 5 deletions components/ambient-cli/cmd/acpctl/session/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func runSend(cmd *cobra.Command, args []string) error {

out := cmd.OutOrStdout()
scanner := bufio.NewScanner(stream)
var reasoningBuf strings.Builder
var inText bool
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
Expand All @@ -91,18 +93,53 @@ func runSend(cmd *cobra.Command, args []string) error {
}

var evt struct {
Type string `json:"type"`
Delta string `json:"delta"`
Type string `json:"type"`
Delta string `json:"delta"`
ToolCallName string `json:"toolCallName"`
Content string `json:"content"`
}
if err := json.Unmarshal([]byte(data), &evt); err != nil {
continue
}
if evt.Type == "TEXT_MESSAGE_CONTENT" && evt.Delta != "" {
fmt.Fprint(out, evt.Delta)
switch evt.Type {
case "REASONING_MESSAGE_CONTENT":
reasoningBuf.WriteString(evt.Delta)
case "REASONING_END":
if reasoningBuf.Len() > 0 {
fmt.Fprintf(out, "[thinking] %s\n", strings.TrimSpace(reasoningBuf.String()))
reasoningBuf.Reset()
}
case "TEXT_MESSAGE_CONTENT":
if evt.Delta != "" {
inText = true
fmt.Fprint(out, evt.Delta)
}
case "TEXT_MESSAGE_END":
if inText {
fmt.Fprintln(out)
inText = false
}
case "TOOL_CALL_START":
if evt.ToolCallName != "" {
fmt.Fprintf(out, "[%s] ", evt.ToolCallName)
}
case "TOOL_CALL_RESULT":
if evt.Content != "" {
var content string
if err := json.Unmarshal([]byte(evt.Content), &content); err != nil {
content = evt.Content
}
lines := strings.SplitN(strings.TrimSpace(content), "\n", 4)
preview := strings.Join(lines, " | ")
if len(lines) >= 4 {
preview += " ..."
}
fmt.Fprintf(out, "→ %s\n", preview)
Comment on lines +127 to +137
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Raise the scanner limit before previewing tool results.

bufio.Scanner still caps each SSE line at 64 KiB. Now that TOOL_CALL_RESULT consumes full evt.Content, one large result can terminate acpctl session send -f with bufio.ErrTooLong.

Patch
 	out := cmd.OutOrStdout()
 	scanner := bufio.NewScanner(stream)
+	scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
 	var reasoningBuf strings.Builder
 	var inText bool

Expected: this file shows no scanner.Buffer call today; if emitters stream full tool-result payloads, the current scanner limit is enough to abort the stream.

#!/bin/bash
set -euo pipefail

sed -n '78,86p' components/ambient-cli/cmd/acpctl/session/send.go
printf '\n'
rg -n -C2 'scanner\.Buffer|bufio\.NewScanner' components/ambient-cli/cmd/acpctl/session/send.go
printf '\n'
rg -n -C2 'TOOL_CALL_RESULT|toolCallName|content' .
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/ambient-cli/cmd/acpctl/session/send.go` around lines 127 - 137,
The SSE reader uses bufio.NewScanner which defaults to a 64KiB token limit
causing bufio.ErrTooLong when TOOL_CALL_RESULT events have large evt.Content;
locate the bufio.NewScanner(...) instance (variable name scanner) used in the
session send flow and call scanner.Buffer(make([]byte, 1024*1024), 10*1024*1024)
(or another appropriate larger max like 5–10MB) immediately after creating the
scanner to raise the token limit, and keep existing evt.Content handling (the
preview code around evt.Content) intact; optionally ensure any bufio.ErrTooLong
is handled/logged if you still want to detect oversized tokens.

}
}
}

if !sendFollowJSON {
if inText {
fmt.Fprintln(out)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ metadata:
labels:
app: ambient-api-server
component: api
annotations:
haproxy.router.openshift.io/timeout: 10m
spec:
to:
kind: Service
Expand All @@ -23,6 +25,8 @@ metadata:
labels:
app: ambient-api-server
component: grpc
annotations:
haproxy.router.openshift.io/timeout: 10m
spec:
to:
kind: Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ metadata:
app: ambient-api-server
component: api
shard: internal
annotations:
haproxy.router.openshift.io/timeout: 10m
Comment thread
coderabbitai[bot] marked this conversation as resolved.
spec:
to:
kind: Service
Expand All @@ -26,6 +28,8 @@ metadata:
app: ambient-api-server
component: grpc
shard: internal
annotations:
haproxy.router.openshift.io/timeout: 10m
spec:
to:
kind: Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ metadata:
labels:
app: ambient-api-server
component: api
annotations:
haproxy.router.openshift.io/timeout: 10m
spec:
to:
kind: Service
Expand All @@ -23,6 +25,8 @@ metadata:
labels:
app: ambient-api-server
component: grpc
annotations:
haproxy.router.openshift.io/timeout: 10m
spec:
to:
kind: Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,14 +515,19 @@ def end_turn(
return

try:
from claude_agent_sdk import TextBlock
# Extract text content — TextBlock is optional (claude_agent_sdk may not be installed)
try:
from claude_agent_sdk import TextBlock as _TextBlock
except ImportError:
_TextBlock = None

# Extract text content
text_content = []
message_content = getattr(message, "content", []) or []
for blk in message_content:
if isinstance(blk, TextBlock):
if _TextBlock is not None and isinstance(blk, _TextBlock):
text_content.append(getattr(blk, "text", ""))
elif hasattr(blk, "text") and isinstance(getattr(blk, "text", None), str):
text_content.append(blk.text)

output_text = (
"\n".join(text_content) if text_content else "(no text output)"
Expand Down
41 changes: 34 additions & 7 deletions components/runners/ambient-runner/ambient_runner/platform/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from urllib.parse import urlparse

from ambient_runner.platform.context import RunnerContext
from ambient_runner.platform.utils import get_bot_token
from ambient_runner.platform.utils import get_bot_token, refresh_bot_token

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -183,18 +183,45 @@ def _do_req():
f"and BOT_TOKEN fallback also failed"
) from fallback_err
if e.code in (401, 403):
logger.warning(
f"{credential_type} credential fetch failed with HTTP {e.code}: {e}"
)
raise PermissionError(
f"{credential_type} authentication failed with HTTP {e.code}"
) from e
# BOT_TOKEN may have expired — refresh from CP endpoint and retry once.
return _retry_with_fresh_bot_token(e.code)
Comment on lines 185 to +187
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle 401/403 from the caller-token BOT_TOKEN fallback too.

This retry only runs when the first request was sent with BOT_TOKEN. If the first request used context.caller_token, Lines 177-184 raise as soon as the fallback BOT_TOKEN gets 401/403, so the fresh-token retry never happens and long-running sessions can still fail on an expired cached bot token.

Patch idea
                 try:
                     with _urllib_request.urlopen(fallback_req, timeout=10) as resp:
                         return resp.read().decode("utf-8", errors="replace")
+                except _urllib_request.HTTPError as fallback_err:
+                    if fallback_err.code in (401, 403):
+                        logger.warning(
+                            f"{credential_type} BOT_TOKEN fallback got {fallback_err.code}; refreshing from CP endpoint and retrying"
+                        )
+                        return _retry_with_fresh_bot_token(fallback_err.code)
+                    logger.warning(
+                        f"{credential_type} BOT_TOKEN fallback also failed: {fallback_err}"
+                    )
+                    raise PermissionError(
+                        f"{credential_type} authentication failed with HTTP {fallback_err.code}"
+                    ) from fallback_err
                 except Exception as fallback_err:
                     logger.warning(
                         f"{credential_type} BOT_TOKEN fallback also failed: {fallback_err}"
                     )
                     raise PermissionError(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/runners/ambient-runner/ambient_runner/platform/auth.py` around
lines 185 - 187, The code currently retries with a fresh BOT_TOKEN only when the
initial request used BOT_TOKEN; if the initial request used context.caller_token
and the fallback BOT_TOKEN then returns 401/403 the code raises immediately
instead of calling the same retry path. Update the exception handling in the
block that falls back from context.caller_token to BOT_TOKEN so that when the
fallback response error code (e.code) is 401 or 403 it calls
_retry_with_fresh_bot_token(e.code) instead of re-raising; keep existing
behavior for other error codes. Ensure you reference the context.caller_token
flow, the BOT_TOKEN fallback branch, the exception object e (e.code), and the
_retry_with_fresh_bot_token helper when making the change.

logger.warning(f"{credential_type} credential fetch failed: {e}")
return ""
except Exception as e:
logger.warning(f"{credential_type} credential fetch failed: {e}")
return ""

def _retry_with_fresh_bot_token(original_code: int):
logger.info(
f"{credential_type} got {original_code} with cached BOT_TOKEN — refreshing from CP endpoint and retrying"
)
try:
fresh_bot = refresh_bot_token()
except Exception as refresh_err:
logger.warning(f"{credential_type} CP token refresh failed: {refresh_err}")
raise PermissionError(
f"{credential_type} authentication failed with HTTP {original_code}"
) from refresh_err
Comment on lines +194 to +204
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Coalesce CP token refreshes across concurrent failures.

Line 209 can run once per failing provider fetch. populate_runtime_credentials() fans out four credential fetches concurrently (Lines 349-356), while ambient_runner/platform/utils.py:31-92 updates the shared bot-token cache without locking. One stale BOT_TOKEN can therefore trigger a refresh stampede and race the cache, so the intermittent auth failures this PR is fixing can still reproduce under load.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/runners/ambient-runner/ambient_runner/platform/auth.py` around
lines 204 - 214, The _retry_with_fresh_bot_token path can trigger concurrent
refreshes when populate_runtime_credentials fans out multiple credential
fetches; fix this by coalescing refreshes for refresh_bot_token so only one
in-flight refresh runs at a time and other callers await its result (e.g., add a
module-level in-progress future or an asyncio.Lock + cached result inside the
refresh logic used by _retry_with_fresh_bot_token and the shared bot-token cache
update in ambient_runner/platform/utils.py), ensure refresh errors propagate to
waiting callers and update the cache only once with the fresh token.

retry_req = _urllib_request.Request(url, method="GET")
if fresh_bot:
retry_req.add_header("Authorization", f"Bearer {fresh_bot}")
if context.current_user_id:
retry_req.add_header("X-Runner-Current-User", context.current_user_id)
try:
with _urllib_request.urlopen(retry_req, timeout=10) as resp:
Comment on lines +205 to +211
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf 'Guard and retry callsites:\n'
sed -n '121,131p;215,221p' components/runners/ambient-runner/ambient_runner/platform/auth.py

printf '\nurlparse() behavior for representative BACKEND_API_URL values:\n'
python - <<'PY'
from urllib.parse import urlparse

samples = [
    "https://ambient-api-server.svc.cluster.local",
    "http://localhost:8080",
    "file:///tmp",
    "data:text/plain,hello",
]

for value in samples:
    p = urlparse(value)
    allowed = bool(p.hostname) and (
        p.hostname.endswith(".svc.cluster.local")
        or p.hostname.endswith(".svc")
        or p.hostname in {"localhost", "127.0.0.1"}
    )
    print(
        f"{value!r} -> scheme={p.scheme!r}, hostname={p.hostname!r}, "
        f"current_guard_rejects={bool(p.hostname and not allowed)}"
    )
PY

Repository: ambient-code/platform

Length of output: 1365


Guard hostless and non-HTTP(S) schemes in addition to hostname allowlist.

The current guard if parsed.hostname and not (allowlisted) fails for file:///tmp and data:text/... URIs—both return hostname=None and thus skip the rejection check. These bypass the cluster-only boundary and expose credentials via unintended schemes. Add checks for explicit scheme allowlist (http, https only) and reject any URL missing parsed.hostname.

🧰 Tools
🪛 Ruff (0.15.9)

[error] 215-215: Audit URL open for permitted schemes. Allowing use of file: or custom schemes is often unexpected.

(S310)


[error] 221-221: Audit URL open for permitted schemes. Allowing use of file: or custom schemes is often unexpected.

(S310)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/runners/ambient-runner/ambient_runner/platform/auth.py` around
lines 215 - 221, Ensure the URL is explicitly an http/https host and not a
hostless scheme: after parsing the URL (the parsed variable) and before creating
retry_req or adding headers (where _urllib_request.Request and retry_req are
used and Authorization/X-Runner-Current-User are added), enforce that
parsed.scheme is either "http" or "https" and that parsed.hostname is present;
if not, reject the request (raise/return an error) rather than proceeding or
relying solely on the hostname allowlist check.

logger.info(f"{credential_type} retry with fresh BOT_TOKEN succeeded")
return resp.read().decode("utf-8", errors="replace")
except _urllib_request.HTTPError as retry_err:
logger.warning(f"{credential_type} retry with fresh BOT_TOKEN failed: {retry_err}")
raise PermissionError(
f"{credential_type} authentication failed with HTTP {retry_err.code}"
) from retry_err
except Exception as retry_err:
logger.warning(f"{credential_type} retry with fresh BOT_TOKEN failed: {retry_err}")
raise PermissionError(
f"{credential_type} authentication failed with HTTP {original_code}"
) from retry_err

resp_text = await loop.run_in_executor(None, _do_req)
if not resp_text:
return {}
Expand Down
21 changes: 21 additions & 0 deletions components/runners/ambient-runner/ambient_runner/platform/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ def get_bot_token() -> str:
return (os.getenv("BOT_TOKEN") or "").strip()


def refresh_bot_token() -> str:
"""Fetch a fresh token from the CP token endpoint and update the in-process cache.

Returns the new token, or the current cached token if the CP endpoint is not
configured (local dev mode). Raises RuntimeError if the CP fetch fails.
"""
cp_token_url = os.getenv("AMBIENT_CP_TOKEN_URL", "")
if not cp_token_url:
return get_bot_token()

public_key_pem = os.getenv("AMBIENT_CP_TOKEN_PUBLIC_KEY", "")
session_id = os.getenv("SESSION_ID", "")
if not public_key_pem or not session_id:
logger.warning("refresh_bot_token: CP env vars incomplete, skipping refresh")
return get_bot_token()

from ambient_runner._grpc_client import _fetch_token_from_cp

return _fetch_token_from_cp(cp_token_url, public_key_pem, session_id)


def is_env_truthy(value: str) -> bool:
"""Return True for "1", "true", or "yes" (case-insensitive)."""
return value.strip().lower() in _TRUTHY_VALUES
Expand Down
Loading