Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 100 additions & 92 deletions mobilerun/agent/utils/oauth/anthropic_oauth_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import hashlib
import json
import os
import queue
import secrets
import sys
import threading
Expand Down Expand Up @@ -64,6 +65,10 @@ def _pkce_pair() -> tuple[str, str]:
return verifier, challenge


# Keep in sync with _MAX_CODE_ATTEMPTS in gemini_oauth_code_assist_llm.py
_MAX_CODE_ATTEMPTS = 2


def _is_headless_environment() -> bool:
"""Detect SSH, WSL, or missing display where browser popups won't work."""
if os.environ.get("SSH_CONNECTION") or os.environ.get("SSH_TTY"):
Expand Down Expand Up @@ -411,9 +416,19 @@ def login(
callback_path: str = "/callback",
expires_in: Optional[int] = None,
) -> str:
# Headless environments: skip local server, use hosted callback page
use_headless = _is_headless_environment() or os.environ.get(
"DROIDRUN_OAUTH_MANUAL", ""
).lower() in ("1", "true", "yes")
if use_headless:
return self.login_headless(
open_browser=open_browser,
timeout_seconds=timeout_seconds,
expires_in=expires_in,
)

# Desktop: browser callback server
result: Dict[str, Optional[str]] = {"code": None, "state": None, "error": None}
manual_code: Dict[str, Optional[str]] = {"code": None}
manual_failed = threading.Event()
done = threading.Event()

code_verifier, code_challenge = _pkce_pair()
Expand Down Expand Up @@ -459,10 +474,12 @@ def log_message(self, format: str, *args: Any) -> None: # noqa: A003
self.authorize_url = original_authorize_url
print(
f"Could not bind callback server on {callback_host}:{callback_port} ({exc}). "
"Falling back to manual code entry."
"Falling back to headless code entry."
)
return self.login_manual(
open_browser=open_browser, expires_in=expires_in
return self.login_headless(
open_browser=open_browser,
timeout_seconds=timeout_seconds,
expires_in=expires_in,
)

actual_port = httpd.server_address[1]
Expand All @@ -481,72 +498,18 @@ def log_message(self, format: str, *args: Any) -> None: # noqa: A003
if open_browser:
webbrowser.open(auth_url)

# Only run the manual-paste race when we can't rely on the local
# browser callback: headless envs (SSH/WSL/no-display), or when the
# user explicitly opts in with DROIDRUN_OAUTH_MANUAL=1. On a normal
# desktop the server always wins anyway, and a blocked input()
# thread would intercept InquirerPy's terminal queries and lag the
# configure wizard.
enable_manual = _is_headless_environment() or os.environ.get(
"DROIDRUN_OAUTH_MANUAL", ""
).lower() in ("1", "true", "yes")
if enable_manual:
def _read_manual() -> None:
for attempt in range(2):
if done.is_set():
return
try:
raw = str(input("Or paste the redirect URL / authorization code: "))
except Exception:
return
if done.is_set():
return
if not raw.strip():
if attempt == 0:
print("Invalid paste. Try again.")
continue
if not done.is_set():
manual_failed.set()
done.set()
return
try:
code = _normalize_manual_code(raw, state)
except Exception: # noqa: BLE001
if attempt == 0:
print("Invalid paste. Try again.")
continue
print("Invalid paste.")
if not done.is_set():
manual_failed.set()
done.set()
return
if code:
manual_code["code"] = code
done.set()
return

manual_thread = threading.Thread(target=_read_manual, daemon=True)
manual_thread.start()

if not done.wait(timeout=timeout_seconds):
raise TimeoutError("OAuth login timed out before callback was received.")

if manual_failed.is_set():
raise RuntimeError("Login failed.")

if manual_code["code"]:
code_to_exchange = manual_code["code"]
else:
if result["error"]:
raise RuntimeError(f"OAuth callback returned error: {result['error']}")
if result["state"] != state:
raise RuntimeError("OAuth callback state mismatch.")
if not result["code"]:
raise RuntimeError("OAuth callback did not include an authorization code.")
code_to_exchange = result["code"]
if result["error"]:
raise RuntimeError(f"OAuth callback returned error: {result['error']}")
if result["state"] != state:
raise RuntimeError("OAuth callback state mismatch.")
if not result["code"]:
raise RuntimeError("OAuth callback did not include an authorization code.")

return self._exchange_authorization_code(
code=code_to_exchange,
code=result["code"],
redirect_uri=redirect_uri,
code_verifier=code_verifier,
state=state,
Expand All @@ -557,13 +520,15 @@ def _read_manual() -> None:
httpd.shutdown()
httpd.server_close()

def login_manual(
def login_headless(
self,
*,
open_browser: bool = True,
open_browser: bool = False,
timeout_seconds: float = 300.0,
input_fn: Any = input,
expires_in: Optional[int] = None,
) -> str:
"""Headless OAuth flow for SSH/WSL environments."""
code_verifier, code_challenge = _pkce_pair()
state = _b64_no_pad(secrets.token_bytes(32))
redirect_uri = "https://platform.claude.com/oauth/code/callback"
Expand All @@ -579,36 +544,79 @@ def login_manual(
)

try:
print(f"Open this URL to login:\n{auth_url}")
print(
f"\nSign in with your Anthropic account:\n"
f"\n1. Open this link in your browser:\n {auth_url}\n"
f"\n2. Complete sign-in, then paste the authorization code shown on the page.\n"
)
if open_browser:
webbrowser.open(auth_url)
for attempt in range(2):
raw = str(input_fn("Paste the redirect URL or authorization code: "))
if not raw.strip():
if attempt == 0:
print("Invalid paste. Try again.")
continue
raise RuntimeError("Login failed.")
try:
code = _normalize_manual_code(raw, state)
except Exception: # noqa: BLE001

deadline = time.time() + timeout_seconds
input_queue: queue.Queue[Optional[str]] = queue.Queue()
stop = threading.Event()
need_more = threading.Event()

def _reader() -> None:
for _ in range(_MAX_CODE_ATTEMPTS):
need_more.wait()
need_more.clear()
if stop.is_set():
return
try:
input_queue.put(str(input_fn("Enter the authorization code: ")))
except (EOFError, OSError):
input_queue.put(None)
return

threading.Thread(target=_reader, daemon=True).start()

try:
for attempt in range(_MAX_CODE_ATTEMPTS):
remaining = deadline - time.time()
if remaining <= 0:
raise TimeoutError("OAuth login timed out.")

need_more.set()

try:
raw = input_queue.get(timeout=remaining)
except queue.Empty:
raise TimeoutError("OAuth login timed out.") from None

if raw is None:
raise RuntimeError("Login failed — stdin closed.")
if not raw.strip():
if attempt == 0:
print("No code entered. Try again.")
continue
raise RuntimeError("Login failed.")
try:
code = _normalize_manual_code(raw, state)
except Exception: # noqa: BLE001
if attempt == 0:
print("Invalid code. Try again.")
continue
raise RuntimeError("Login failed.")
if code:
return self._exchange_authorization_code(
code=code,
redirect_uri=redirect_uri,
code_verifier=code_verifier,
state=state,
expires_in=expires_in,
)
if attempt == 0:
print("Invalid paste. Try again.")
print("Invalid code. Try again.")
continue
raise RuntimeError("Login failed.")
if code:
return self._exchange_authorization_code(
code=code,
redirect_uri=redirect_uri,
code_verifier=code_verifier,
state=state,
expires_in=expires_in,
)
if attempt == 0:
print("Invalid paste. Try again.")
continue
raise RuntimeError("Login failed.")
raise RuntimeError("Login failed.")
finally:
# stop.set() prevents the reader from starting a new input() call.
# It cannot interrupt an input() already in progress (Python limitation);
# the daemon thread dies with the process.
stop.set()
need_more.set()
finally:
self.authorize_url = original_authorize_url

Expand Down
Loading
Loading