Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 127 additions & 27 deletions mobilerun/agent/utils/oauth/anthropic_oauth_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
import secrets
import sys
import threading
import time
import webbrowser
Expand Down Expand Up @@ -63,17 +64,36 @@ def _pkce_pair() -> tuple[str, str]:
return verifier, challenge


def _is_headless_environment() -> bool:
"""Detect SSH, WSL, or missing display where browser popups won't work."""
if os.environ.get("SSH_CONNECTION") or os.environ.get("SSH_TTY"):
return True
if os.environ.get("WSL_DISTRO_NAME"):
return True
if sys.platform.startswith("linux"):
if not os.environ.get("DISPLAY") and not os.environ.get("WAYLAND_DISPLAY"):
return True
return False


def _normalize_manual_code(raw: str, expected_state: str) -> str:
value = raw.strip()
if not value:
return value

first_token = value.split()[0]

if "code=" in first_token:
if "error=" in first_token or "code=" in first_token:
parsed = urlparse(first_token)
params = parse_qs(parsed.query)
error = params.get("error", [None])[0]
if error:
desc = params.get("error_description", [error])[0]
raise RuntimeError(f"OAuth error: {desc}")
code = params.get("code", [None])[0]
state_from_url = params.get("state", [None])[0]
if state_from_url and state_from_url != expected_state:
raise RuntimeError("OAuth manual code state mismatch.")
if isinstance(code, str) and code:
return code

Expand Down Expand Up @@ -392,6 +412,8 @@ def login(
expires_in: Optional[int] = None,
) -> str:
result: Dict[str, Optional[str]] = {"code": None, "state": None, "error": None}
manual_code: Dict[str, Optional[str]] = {"code": None}
manual_failed = threading.Event()
done = threading.Event()

code_verifier, code_challenge = _pkce_pair()
Expand Down Expand Up @@ -431,7 +453,18 @@ def do_GET(self) -> None: # noqa: N802
def log_message(self, format: str, *args: Any) -> None: # noqa: A003
return

httpd = HTTPServer((callback_host, callback_port), _OAuthHandler)
try:
httpd = HTTPServer((callback_host, callback_port), _OAuthHandler)
except OSError as exc:
self.authorize_url = original_authorize_url
print(
f"Could not bind callback server on {callback_host}:{callback_port} ({exc}). "
"Falling back to manual code entry."
)
return self.login_manual(
open_browser=open_browser, expires_in=expires_in
)

actual_port = httpd.server_address[1]
redirect_uri = f"http://localhost:{actual_port}{callback_path}"
auth_url = self._build_auth_url(
Expand All @@ -442,23 +475,78 @@ def log_message(self, format: str, *args: Any) -> None: # noqa: A003

server_thread = threading.Thread(target=httpd.serve_forever, daemon=True)
server_thread.start()

try:
print(f"Open this URL to login:\n{auth_url}\n")
if open_browser:
webbrowser.open(auth_url)
else:
print(f"Open this URL to login:\n{auth_url}")

# Only run the manual-paste race when we can't rely on the local
# browser callback: headless envs (SSH/WSL/no-display), or when the
# user explicitly opts in with DROIDRUN_OAUTH_MANUAL=1. On a normal
# desktop the server always wins anyway, and a blocked input()
# thread would intercept InquirerPy's terminal queries and lag the
# configure wizard.
enable_manual = _is_headless_environment() or os.environ.get(
"DROIDRUN_OAUTH_MANUAL", ""
).lower() in ("1", "true", "yes")
if enable_manual:
def _read_manual() -> None:
for attempt in range(2):
if done.is_set():
return
try:
raw = str(input("Or paste the redirect URL / authorization code: "))
except Exception:
return
if done.is_set():
return
if not raw.strip():
if attempt == 0:
print("Invalid paste. Try again.")
continue
if not done.is_set():
manual_failed.set()
done.set()
return
try:
code = _normalize_manual_code(raw, state)
except Exception: # noqa: BLE001
if attempt == 0:
print("Invalid paste. Try again.")
continue
print("Invalid paste.")
if not done.is_set():
manual_failed.set()
done.set()
return
if code:
manual_code["code"] = code
done.set()
return

manual_thread = threading.Thread(target=_read_manual, daemon=True)
manual_thread.start()

if not done.wait(timeout=timeout_seconds):
raise TimeoutError("OAuth login timed out before callback was received.")
if result["error"]:
raise RuntimeError(f"OAuth callback returned error: {result['error']}")
if result["state"] != state:
raise RuntimeError("OAuth callback state mismatch.")
if not result["code"]:
raise RuntimeError("OAuth callback did not include an authorization code.")

if manual_failed.is_set():
raise RuntimeError("Login failed.")

if manual_code["code"]:
code_to_exchange = manual_code["code"]
else:
if result["error"]:
raise RuntimeError(f"OAuth callback returned error: {result['error']}")
if result["state"] != state:
raise RuntimeError("OAuth callback state mismatch.")
if not result["code"]:
raise RuntimeError("OAuth callback did not include an authorization code.")
code_to_exchange = result["code"]

return self._exchange_authorization_code(
code=result["code"],
code=code_to_exchange,
redirect_uri=redirect_uri,
code_verifier=code_verifier,
state=state,
Expand Down Expand Up @@ -491,26 +579,38 @@ def login_manual(
)

try:
print(f"Open this URL to login:\n{auth_url}")
if open_browser:
webbrowser.open(auth_url)
print(f"Open this URL to login:\n{auth_url}")
code = _normalize_manual_code(
str(input_fn("Paste authorization code: ")),
state,
)
if not code:
raise ValueError("Authorization code was empty.")

return self._exchange_authorization_code(
code=code,
redirect_uri=redirect_uri,
code_verifier=code_verifier,
state=state,
expires_in=expires_in,
)
for attempt in range(2):
raw = str(input_fn("Paste the redirect URL or authorization code: "))
if not raw.strip():
if attempt == 0:
print("Invalid paste. Try again.")
continue
raise RuntimeError("Login failed.")
try:
code = _normalize_manual_code(raw, state)
except Exception: # noqa: BLE001
if attempt == 0:
print("Invalid paste. Try again.")
continue
raise RuntimeError("Login failed.")
if code:
return self._exchange_authorization_code(
code=code,
redirect_uri=redirect_uri,
code_verifier=code_verifier,
state=state,
expires_in=expires_in,
)
if attempt == 0:
print("Invalid paste. Try again.")
continue
raise RuntimeError("Login failed.")
raise RuntimeError("Login failed.")
finally:
self.authorize_url = original_authorize_url


def _resolve_access_token(self) -> str:
env_access_token = os.environ.get("ANTHROPIC_OAUTH_TOKEN")
Expand Down
Loading
Loading