diff --git a/mobilerun/agent/utils/oauth/anthropic_oauth_llm.py b/mobilerun/agent/utils/oauth/anthropic_oauth_llm.py index ce723f07..555c4223 100644 --- a/mobilerun/agent/utils/oauth/anthropic_oauth_llm.py +++ b/mobilerun/agent/utils/oauth/anthropic_oauth_llm.py @@ -2,6 +2,7 @@ import hashlib import json import os +import queue import secrets import sys import threading @@ -64,6 +65,10 @@ def _pkce_pair() -> tuple[str, str]: return verifier, challenge +# Keep in sync with _MAX_CODE_ATTEMPTS in gemini_oauth_code_assist_llm.py +_MAX_CODE_ATTEMPTS = 2 + + def _is_headless_environment() -> bool: """Detect SSH, WSL, or missing display where browser popups won't work.""" if os.environ.get("SSH_CONNECTION") or os.environ.get("SSH_TTY"): @@ -411,9 +416,19 @@ def login( callback_path: str = "/callback", expires_in: Optional[int] = None, ) -> str: + # Headless environments: skip local server, use hosted callback page + use_headless = _is_headless_environment() or os.environ.get( + "DROIDRUN_OAUTH_MANUAL", "" + ).lower() in ("1", "true", "yes") + if use_headless: + return self.login_headless( + open_browser=open_browser, + timeout_seconds=timeout_seconds, + expires_in=expires_in, + ) + + # Desktop: browser callback server result: Dict[str, Optional[str]] = {"code": None, "state": None, "error": None} - manual_code: Dict[str, Optional[str]] = {"code": None} - manual_failed = threading.Event() done = threading.Event() code_verifier, code_challenge = _pkce_pair() @@ -459,10 +474,12 @@ def log_message(self, format: str, *args: Any) -> None: # noqa: A003 self.authorize_url = original_authorize_url print( f"Could not bind callback server on {callback_host}:{callback_port} ({exc}). " - "Falling back to manual code entry." + "Falling back to headless code entry." ) - return self.login_manual( - open_browser=open_browser, expires_in=expires_in + return self.login_headless( + open_browser=open_browser, + timeout_seconds=timeout_seconds, + expires_in=expires_in, ) actual_port = httpd.server_address[1] @@ -481,72 +498,18 @@ def log_message(self, format: str, *args: Any) -> None: # noqa: A003 if open_browser: webbrowser.open(auth_url) - # Only run the manual-paste race when we can't rely on the local - # browser callback: headless envs (SSH/WSL/no-display), or when the - # user explicitly opts in with DROIDRUN_OAUTH_MANUAL=1. On a normal - # desktop the server always wins anyway, and a blocked input() - # thread would intercept InquirerPy's terminal queries and lag the - # configure wizard. - enable_manual = _is_headless_environment() or os.environ.get( - "DROIDRUN_OAUTH_MANUAL", "" - ).lower() in ("1", "true", "yes") - if enable_manual: - def _read_manual() -> None: - for attempt in range(2): - if done.is_set(): - return - try: - raw = str(input("Or paste the redirect URL / authorization code: ")) - except Exception: - return - if done.is_set(): - return - if not raw.strip(): - if attempt == 0: - print("Invalid paste. Try again.") - continue - if not done.is_set(): - manual_failed.set() - done.set() - return - try: - code = _normalize_manual_code(raw, state) - except Exception: # noqa: BLE001 - if attempt == 0: - print("Invalid paste. Try again.") - continue - print("Invalid paste.") - if not done.is_set(): - manual_failed.set() - done.set() - return - if code: - manual_code["code"] = code - done.set() - return - - manual_thread = threading.Thread(target=_read_manual, daemon=True) - manual_thread.start() - if not done.wait(timeout=timeout_seconds): raise TimeoutError("OAuth login timed out before callback was received.") - if manual_failed.is_set(): - raise RuntimeError("Login failed.") - - if manual_code["code"]: - code_to_exchange = manual_code["code"] - else: - if result["error"]: - raise RuntimeError(f"OAuth callback returned error: {result['error']}") - if result["state"] != state: - raise RuntimeError("OAuth callback state mismatch.") - if not result["code"]: - raise RuntimeError("OAuth callback did not include an authorization code.") - code_to_exchange = result["code"] + if result["error"]: + raise RuntimeError(f"OAuth callback returned error: {result['error']}") + if result["state"] != state: + raise RuntimeError("OAuth callback state mismatch.") + if not result["code"]: + raise RuntimeError("OAuth callback did not include an authorization code.") return self._exchange_authorization_code( - code=code_to_exchange, + code=result["code"], redirect_uri=redirect_uri, code_verifier=code_verifier, state=state, @@ -557,13 +520,15 @@ def _read_manual() -> None: httpd.shutdown() httpd.server_close() - def login_manual( + def login_headless( self, *, - open_browser: bool = True, + open_browser: bool = False, + timeout_seconds: float = 300.0, input_fn: Any = input, expires_in: Optional[int] = None, ) -> str: + """Headless OAuth flow for SSH/WSL environments.""" code_verifier, code_challenge = _pkce_pair() state = _b64_no_pad(secrets.token_bytes(32)) redirect_uri = "https://platform.claude.com/oauth/code/callback" @@ -579,36 +544,79 @@ def login_manual( ) try: - print(f"Open this URL to login:\n{auth_url}") + print( + f"\nSign in with your Anthropic account:\n" + f"\n1. Open this link in your browser:\n {auth_url}\n" + f"\n2. Complete sign-in, then paste the authorization code shown on the page.\n" + ) if open_browser: webbrowser.open(auth_url) - for attempt in range(2): - raw = str(input_fn("Paste the redirect URL or authorization code: ")) - if not raw.strip(): - if attempt == 0: - print("Invalid paste. Try again.") - continue - raise RuntimeError("Login failed.") - try: - code = _normalize_manual_code(raw, state) - except Exception: # noqa: BLE001 + + deadline = time.time() + timeout_seconds + input_queue: queue.Queue[Optional[str]] = queue.Queue() + stop = threading.Event() + need_more = threading.Event() + + def _reader() -> None: + for _ in range(_MAX_CODE_ATTEMPTS): + need_more.wait() + need_more.clear() + if stop.is_set(): + return + try: + input_queue.put(str(input_fn("Enter the authorization code: "))) + except (EOFError, OSError): + input_queue.put(None) + return + + threading.Thread(target=_reader, daemon=True).start() + + try: + for attempt in range(_MAX_CODE_ATTEMPTS): + remaining = deadline - time.time() + if remaining <= 0: + raise TimeoutError("OAuth login timed out.") + + need_more.set() + + try: + raw = input_queue.get(timeout=remaining) + except queue.Empty: + raise TimeoutError("OAuth login timed out.") from None + + if raw is None: + raise RuntimeError("Login failed — stdin closed.") + if not raw.strip(): + if attempt == 0: + print("No code entered. Try again.") + continue + raise RuntimeError("Login failed.") + try: + code = _normalize_manual_code(raw, state) + except Exception: # noqa: BLE001 + if attempt == 0: + print("Invalid code. Try again.") + continue + raise RuntimeError("Login failed.") + if code: + return self._exchange_authorization_code( + code=code, + redirect_uri=redirect_uri, + code_verifier=code_verifier, + state=state, + expires_in=expires_in, + ) if attempt == 0: - print("Invalid paste. Try again.") + print("Invalid code. Try again.") continue raise RuntimeError("Login failed.") - if code: - return self._exchange_authorization_code( - code=code, - redirect_uri=redirect_uri, - code_verifier=code_verifier, - state=state, - expires_in=expires_in, - ) - if attempt == 0: - print("Invalid paste. Try again.") - continue raise RuntimeError("Login failed.") - raise RuntimeError("Login failed.") + finally: + # stop.set() prevents the reader from starting a new input() call. + # It cannot interrupt an input() already in progress (Python limitation); + # the daemon thread dies with the process. + stop.set() + need_more.set() finally: self.authorize_url = original_authorize_url diff --git a/mobilerun/agent/utils/oauth/gemini_oauth_code_assist_llm.py b/mobilerun/agent/utils/oauth/gemini_oauth_code_assist_llm.py index a4ec51f3..8bfd7bbb 100644 --- a/mobilerun/agent/utils/oauth/gemini_oauth_code_assist_llm.py +++ b/mobilerun/agent/utils/oauth/gemini_oauth_code_assist_llm.py @@ -2,6 +2,7 @@ import hashlib import json import os +import queue import secrets import sys import threading @@ -58,6 +59,10 @@ def _pkce_pair() -> tuple[str, str]: return verifier, challenge +# Keep in sync with _MAX_CODE_ATTEMPTS in anthropic_oauth_llm.py +_MAX_CODE_ATTEMPTS = 2 + + def _is_headless_environment() -> bool: """Detect SSH, WSL, or missing display where browser popups won't work.""" if os.environ.get("SSH_CONNECTION") or os.environ.get("SSH_TTY"): @@ -518,9 +523,19 @@ def login( callback_path: str = "/oauth2callback", prompt_consent: bool = True, ) -> str: + # Headless environments: use authcode redirect flow (no local server) + use_authcode = _is_headless_environment() or os.environ.get( + "DROIDRUN_OAUTH_MANUAL", "" + ).lower() in ("1", "true", "yes") + if use_authcode: + return self.login_headless( + open_browser=open_browser, + timeout_seconds=timeout_seconds, + prompt_consent=prompt_consent, + ) + + # Desktop: browser callback server result: Dict[str, Optional[str]] = {"code": None, "state": None, "error": None} - manual_code: Dict[str, Optional[str]] = {"code": None} - manual_failed = threading.Event() done = threading.Event() expected_state = secrets.token_hex(32) code_verifier, code_challenge = _pkce_pair() @@ -560,10 +575,12 @@ def log_message(self, format: str, *args: Any) -> None: # noqa: A003 except OSError as exc: print( f"Could not bind callback server on {callback_host}:{callback_port} ({exc}). " - "Falling back to manual code entry." + "Falling back to headless code entry." ) - return self.login_manual( - open_browser=open_browser, prompt_consent=prompt_consent + return self.login_headless( + open_browser=open_browser, + timeout_seconds=timeout_seconds, + prompt_consent=prompt_consent, ) actual_port = httpd.server_address[1] @@ -583,94 +600,35 @@ def log_message(self, format: str, *args: Any) -> None: # noqa: A003 if open_browser: webbrowser.open(auth_url) - # Only run the manual-paste race when we can't rely on the local - # browser callback: headless envs (SSH/WSL/no-display), or when the - # user explicitly opts in with DROIDRUN_OAUTH_MANUAL=1. On a normal - # desktop the server always wins anyway, and a blocked input() - # thread would intercept InquirerPy's terminal queries and lag the - # configure wizard. - enable_manual = _is_headless_environment() or os.environ.get( - "DROIDRUN_OAUTH_MANUAL", "" - ).lower() in ("1", "true", "yes") - if enable_manual: - def _read_manual() -> None: - for attempt in range(2): - if done.is_set(): - return - try: - raw = str(input("Or paste the redirect URL / authorization code: ")) - except Exception: - return - if done.is_set(): - return - if not raw.strip(): - if attempt == 0: - print("Invalid paste. Try again.") - continue - if not done.is_set(): - manual_failed.set() - done.set() - return - try: - code = _normalize_manual_code(raw, expected_state) - except Exception: # noqa: BLE001 - if attempt == 0: - print("Invalid paste. Try again.") - continue - print("Invalid paste.") - if not done.is_set(): - manual_failed.set() - done.set() - return - if code: - manual_code["code"] = code - done.set() - return - - manual_thread = threading.Thread(target=_read_manual, daemon=True) - manual_thread.start() - if not done.wait(timeout=timeout_seconds): raise TimeoutError("OAuth login timed out before callback was received.") - if manual_failed.is_set(): - raise RuntimeError("Login failed.") - - if manual_code["code"]: - code_to_exchange = manual_code["code"] - else: - if result["error"]: - raise RuntimeError(f"OAuth callback returned error: {result['error']}") - if result["state"] != expected_state: - raise RuntimeError("OAuth callback state mismatch.") - if not result["code"]: - raise RuntimeError("OAuth callback did not include an authorization code.") - code_to_exchange = result["code"] + if result["error"]: + raise RuntimeError(f"OAuth callback returned error: {result['error']}") + if result["state"] != expected_state: + raise RuntimeError("OAuth callback state mismatch.") + if not result["code"]: + raise RuntimeError("OAuth callback did not include an authorization code.") return self._exchange_authorization_code( - code_to_exchange, redirect_uri, code_verifier=code_verifier + result["code"], redirect_uri, code_verifier=code_verifier ) finally: httpd.shutdown() httpd.server_close() - def login_manual( + def login_headless( self, *, - open_browser: bool = True, + open_browser: bool = False, + timeout_seconds: float = 300.0, input_fn: Any = input, prompt_consent: bool = True, ) -> str: - """Manual OAuth flow for headless/VPS/WSL environments. - - Opens (or prints) the auth URL and prompts the user to paste the - redirected URL or bare authorization code from the browser. - """ + """Headless OAuth flow for SSH/WSL environments.""" code_verifier, code_challenge = _pkce_pair() expected_state = secrets.token_hex(32) - # Google allows any loopback redirect for installed apps. The browser - # will fail to load the page, but the URL bar will contain the code. - redirect_uri = "http://localhost/oauth2callback" + redirect_uri = "https://codeassist.google.com/authcode" auth_url = self._build_auth_url( redirect_uri=redirect_uri, @@ -679,33 +637,75 @@ def login_manual( code_challenge=code_challenge, ) - print(f"Open this URL to login:\n{auth_url}") + print( + f"\nSign in with your Google account:\n" + f"\n1. Open this link in your browser:\n {auth_url}\n" + f"\n2. Complete sign-in, then paste the authorization code shown on the page.\n" + ) if open_browser: webbrowser.open(auth_url) - for attempt in range(2): - raw = str(input_fn("Paste the redirect URL or authorization code: ")) - if not raw.strip(): - if attempt == 0: - print("Invalid paste. Try again.") - continue - raise RuntimeError("Login failed.") - try: - code = _normalize_manual_code(raw, expected_state) - except Exception: # noqa: BLE001 + deadline = time.time() + timeout_seconds + input_queue: queue.Queue[Optional[str]] = queue.Queue() + stop = threading.Event() + need_more = threading.Event() + + def _reader() -> None: + for _ in range(_MAX_CODE_ATTEMPTS): + need_more.wait() + need_more.clear() + if stop.is_set(): + return + try: + input_queue.put(str(input_fn("Enter the authorization code: "))) + except (EOFError, OSError): + input_queue.put(None) + return + + threading.Thread(target=_reader, daemon=True).start() + + try: + for attempt in range(_MAX_CODE_ATTEMPTS): + remaining = deadline - time.time() + if remaining <= 0: + raise TimeoutError("OAuth login timed out.") + + need_more.set() + + try: + raw = input_queue.get(timeout=remaining) + except queue.Empty: + raise TimeoutError("OAuth login timed out.") from None + + if raw is None: + raise RuntimeError("Login failed — stdin closed.") + if not raw.strip(): + if attempt == 0: + print("No code entered. Try again.") + continue + raise RuntimeError("Login failed.") + try: + code = _normalize_manual_code(raw, expected_state) + except Exception: # noqa: BLE001 + if attempt == 0: + print("Invalid code. Try again.") + continue + raise RuntimeError("Login failed.") + if code: + return self._exchange_authorization_code( + code, redirect_uri, code_verifier=code_verifier + ) if attempt == 0: - print("Invalid paste. Try again.") + print("Invalid code. Try again.") continue raise RuntimeError("Login failed.") - if code: - return self._exchange_authorization_code( - code, redirect_uri, code_verifier=code_verifier - ) - if attempt == 0: - print("Invalid paste. Try again.") - continue raise RuntimeError("Login failed.") - raise RuntimeError("Login failed.") + finally: + # stop.set() prevents the reader from starting a new input() call. + # It cannot interrupt an input() already in progress (Python limitation); + # the daemon thread dies with the process. + stop.set() + need_more.set() def _resolve_access_token(self) -> str: env_access_token = os.environ.get("GEMINI_OAUTH_ACCESS_TOKEN") diff --git a/mobilerun/agent/utils/oauth/openai_oauth_llm.py b/mobilerun/agent/utils/oauth/openai_oauth_llm.py index f2159bdc..d7aa6fb6 100644 --- a/mobilerun/agent/utils/oauth/openai_oauth_llm.py +++ b/mobilerun/agent/utils/oauth/openai_oauth_llm.py @@ -73,37 +73,6 @@ def _is_headless_environment() -> bool: return False -def _normalize_manual_code(raw: str, expected_state: str) -> str: - """Parse pasted input: full URL with code= param, code#state, or bare code.""" - value = raw.strip() - if not value: - return value - - first_token = value.split()[0] - - if "error=" in first_token or "code=" in first_token: - parsed = urlparse(first_token) - params = parse_qs(parsed.query) - error = params.get("error", [None])[0] - if error: - desc = params.get("error_description", [error])[0] - raise RuntimeError(f"OAuth error: {desc}") - code = params.get("code", [None])[0] - state_from_url = params.get("state", [None])[0] - if state_from_url and state_from_url != expected_state: - raise RuntimeError("OAuth manual code state mismatch.") - if isinstance(code, str) and code: - return code - - if "#" in first_token: - code_part, fragment = first_token.split("#", 1) - if fragment and fragment != expected_state: - raise RuntimeError("OAuth manual code state mismatch.") - return code_part - - return first_token - - def _tls_preflight(issuer: str, timeout: float = 5.0) -> None: """Probe the OAuth issuer to detect TLS/certificate issues before login. @@ -149,6 +118,100 @@ def _tls_preflight(issuer: str, timeout: float = 5.0) -> None: print(f"Warning: TLS preflight check encountered an error: {exc}") +_DEVICE_CODE_TIMEOUT = 15 * 60 + + +def _request_device_code( + issuer: str, + client_id: str, + http_client: Optional[httpx.Client] = None, + request_timeout: float = 15.0, + retries: int = 2, +) -> dict: + url = f"{issuer.rstrip('/')}/api/accounts/deviceauth/usercode" + post = http_client.post if http_client is not None else httpx.post + for attempt in range(1 + retries): + try: + response = post( + url, + headers={"Content-Type": "application/json"}, + json={"client_id": client_id}, + timeout=request_timeout, + ) + except (httpx.ConnectError, httpx.TimeoutException): + if attempt < retries: + time.sleep(2) + continue + raise + if response.status_code == 404: + raise RuntimeError("Device code login is not enabled for this server.") + if response.status_code >= 500 and attempt < retries: + time.sleep(2) + continue + response.raise_for_status() + return response.json() + raise RuntimeError("Device code request failed after retries.") + + +def _poll_device_code( + issuer: str, + device_auth_id: str, + user_code: str, + interval: int, + http_client: Optional[httpx.Client] = None, + request_timeout: float = 15.0, + timeout_seconds: float = _DEVICE_CODE_TIMEOUT, +) -> dict: + url = f"{issuer.rstrip('/')}/api/accounts/deviceauth/token" + effective_timeout = min(timeout_seconds, _DEVICE_CODE_TIMEOUT) + deadline = time.time() + effective_timeout + post = http_client.post if http_client is not None else httpx.post + + last_error: Optional[str] = None + + while time.time() < deadline: + try: + response = post( + url, + headers={"Content-Type": "application/json"}, + json={"device_auth_id": device_auth_id, "user_code": user_code}, + timeout=request_timeout, + ) + except (httpx.ConnectError, httpx.TimeoutException) as exc: + last_error = str(exc) + remaining = deadline - time.time() + if remaining <= 0: + break + time.sleep(min(interval, remaining)) + continue + if response.status_code == 200: + return response.json() + # OpenAI returns 403/404 while the user hasn't completed browser auth. + # This differs from RFC 8628's 400 + authorization_pending body, but + # matches the observed behaviour of auth.openai.com/api/accounts/deviceauth/token. + # TODO: handle slow_down (RFC 8628 §3.5) by increasing interval. + if response.status_code in (403, 404): + remaining = deadline - time.time() + if remaining <= 0: + break + time.sleep(min(interval, remaining)) + continue + if response.status_code >= 500: + last_error = f"HTTP {response.status_code}" + remaining = deadline - time.time() + if remaining <= 0: + break + time.sleep(min(interval, remaining)) + continue + response.raise_for_status() + + minutes = int(effective_timeout // 60) + msg = f"Device code login timed out ({minutes} minutes)." + if last_error: + msg += f" Last response: {last_error}." + raise TimeoutError(msg) + + @dataclass class OpenAIOAuthCredentials: access_token: str @@ -571,9 +634,15 @@ def login( ) -> OpenAIOAuthCredentials: _tls_preflight(self._oauth_manager.issuer) + # Headless environments: use device code flow (no local server needed) + use_device_code = _is_headless_environment() or os.environ.get( + "DROIDRUN_OAUTH_MANUAL", "" + ).lower() in ("1", "true", "yes") + if use_device_code: + return self._login_device_code(timeout_seconds=timeout_seconds) + + # Desktop: browser callback server result: Dict[str, Optional[str]] = {"code": None, "state": None, "error": None} - manual_code: Dict[str, Optional[str]] = {"code": None} - manual_failed = threading.Event() done = threading.Event() code_verifier, code_challenge = _pkce_pair() state = _b64_no_pad(secrets.token_bytes(32)) @@ -613,15 +682,9 @@ def log_message(self, format: str, *args: Any) -> None: # noqa: A003 except OSError as exc: print( f"Could not bind callback server on {callback_host}:{callback_port} ({exc}). " - "Falling back to manual code entry." - ) - return self.login_manual( - open_browser=open_browser, - callback_port=callback_port, - callback_path=callback_path, - redirect_host=redirect_host, - scope=scope, + "Falling back to device code login." ) + return self._login_device_code(timeout_seconds=timeout_seconds) actual_port = httpd.server_address[1] redirect_uri = f"http://{redirect_host}:{actual_port}{callback_path}" @@ -642,72 +705,18 @@ def log_message(self, format: str, *args: Any) -> None: # noqa: A003 if open_browser: webbrowser.open(auth_url) - # Only run the manual-paste race when we can't rely on the local - # browser callback: headless envs (SSH/WSL/no-display), or when the - # user explicitly opts in with DROIDRUN_OAUTH_MANUAL=1. On a normal - # desktop the server always wins anyway, and a blocked input() - # thread would intercept InquirerPy's terminal queries and lag the - # configure wizard. - enable_manual = _is_headless_environment() or os.environ.get( - "DROIDRUN_OAUTH_MANUAL", "" - ).lower() in ("1", "true", "yes") - if enable_manual: - def _read_manual() -> None: - for attempt in range(2): - if done.is_set(): - return - try: - raw = str(input("Or paste the redirect URL / authorization code: ")) - except Exception: - return - if done.is_set(): - return - if not raw.strip(): - if attempt == 0: - print("Invalid paste. Try again.") - continue - if not done.is_set(): - manual_failed.set() - done.set() - return - try: - code = _normalize_manual_code(raw, state) - except Exception: # noqa: BLE001 - if attempt == 0: - print("Invalid paste. Try again.") - continue - print("Invalid paste.") - if not done.is_set(): - manual_failed.set() - done.set() - return - if code: - manual_code["code"] = code - done.set() - return - - manual_thread = threading.Thread(target=_read_manual, daemon=True) - manual_thread.start() - if not done.wait(timeout=timeout_seconds): raise TimeoutError("OAuth login timed out before callback was received.") - if manual_failed.is_set(): - raise RuntimeError("Login failed.") - - if manual_code["code"]: - code_to_exchange = manual_code["code"] - else: - if result["error"]: - raise RuntimeError(f"OAuth callback returned error: {result['error']}") - if result["state"] != state: - raise RuntimeError("OAuth callback state mismatch.") - if not result["code"]: - raise RuntimeError("OAuth callback did not include an authorization code.") - code_to_exchange = result["code"] + if result["error"]: + raise RuntimeError(f"OAuth callback returned error: {result['error']}") + if result["state"] != state: + raise RuntimeError("OAuth callback state mismatch.") + if not result["code"]: + raise RuntimeError("OAuth callback did not include an authorization code.") creds = self._oauth_manager.exchange_authorization_code( - code=code_to_exchange, + code=result["code"], redirect_uri=redirect_uri, code_verifier=code_verifier, ) @@ -718,66 +727,85 @@ def _read_manual() -> None: httpd.shutdown() httpd.server_close() - def login_manual( + def _login_device_code( self, *, - open_browser: bool = True, - input_fn: Any = input, - callback_port: int = DEFAULT_OPENAI_OAUTH_CALLBACK_PORT, - callback_path: str = DEFAULT_OPENAI_OAUTH_CALLBACK_PATH, - redirect_host: str = DEFAULT_OPENAI_OAUTH_CALLBACK_HOST, - scope: str = DEFAULT_OPENAI_OAUTH_SCOPE, + timeout_seconds: float = _DEVICE_CODE_TIMEOUT, ) -> OpenAIOAuthCredentials: - """Manual OAuth flow for headless/VPS/WSL environments. + """Device Code login for headless/SSH environments. - Uses the same redirect_uri as the browser flow (OpenAI requires - port 1455). The browser will fail to load the redirect page, but - the URL bar will contain the authorization code. + timeout_seconds is capped at 15 min (device code expiry). """ - code_verifier, code_challenge = _pkce_pair() - state = _b64_no_pad(secrets.token_bytes(32)) - redirect_uri = f"http://{redirect_host}:{callback_port}{callback_path}" - auth_url = self._build_auth_url( - issuer=self._oauth_manager.issuer, - client_id=self._oauth_manager.client_id, - redirect_uri=redirect_uri, - code_challenge=code_challenge, - state=state, - scope=scope, + mgr = self._oauth_manager + http_client = mgr.http_client + + device_resp = _request_device_code( + mgr.issuer, mgr.client_id, + http_client=http_client, + request_timeout=mgr.request_timeout, ) + device_auth_id = device_resp.get("device_auth_id") + if not device_auth_id: + raise RuntimeError("Device code response missing 'device_auth_id'.") - print(f"Open this URL to login:\n{auth_url}") - if open_browser: - webbrowser.open(auth_url) - - for attempt in range(2): - raw = str(input_fn("Paste the redirect URL or authorization code: ")) - if not raw.strip(): - if attempt == 0: - print("Invalid paste. Try again.") - continue - raise RuntimeError("Login failed.") - try: - code = _normalize_manual_code(raw, state) - except Exception: # noqa: BLE001 - if attempt == 0: - print("Invalid paste. Try again.") - continue - raise RuntimeError("Login failed.") - if code: - creds = self._oauth_manager.exchange_authorization_code( - code=code, - redirect_uri=redirect_uri, - code_verifier=code_verifier, - ) - if creds.account_id: - object.__setattr__(self, "_oauth_account_id", creds.account_id) - return creds - if attempt == 0: - print("Invalid paste. Try again.") - continue - raise RuntimeError("Login failed.") - raise RuntimeError("Login failed.") + user_code = device_resp.get("user_code") or device_resp.get("usercode") + if not user_code: + raise RuntimeError("Device code response missing 'user_code'.") + try: + interval = int(str(device_resp.get("interval", "5")).strip()) + except (TypeError, ValueError): + interval = 5 + try: + server_expires = int(device_resp["expires_in"]) + except (KeyError, TypeError, ValueError): + server_expires = _DEVICE_CODE_TIMEOUT + effective_timeout = min(timeout_seconds, _DEVICE_CODE_TIMEOUT, server_expires) + verification_url = ( + device_resp.get("verification_uri") + or device_resp.get("verification_url") + or f"{mgr.issuer}/codex/device" + ) + + if effective_timeout >= 60: + mins = int(effective_timeout // 60) + expires_str = f"{mins} minute{'s' if mins != 1 else ''}" + else: + secs = int(effective_timeout) + expires_str = f"{secs} second{'s' if secs != 1 else ''}" + print( + f"\nSign in with your ChatGPT account:\n" + f"\n1. Open this link in your browser:\n {verification_url}\n" + f"\n2. Enter this code (expires in {expires_str}):\n {user_code}\n" + f"\nDevice codes are a common phishing target. Never share this code.\n" + ) + + token_resp = _poll_device_code( + mgr.issuer, + device_auth_id, + user_code, + interval, + http_client=http_client, + request_timeout=mgr.request_timeout, + timeout_seconds=effective_timeout, + ) + + auth_code = token_resp.get("authorization_code") + code_verifier = token_resp.get("code_verifier") + if not auth_code or not code_verifier: + raise RuntimeError( + "Device code token response missing required fields " + "('authorization_code' / 'code_verifier')." + ) + + redirect_uri = f"{mgr.issuer}/deviceauth/callback" + creds = self._oauth_manager.exchange_authorization_code( + code=auth_code, + redirect_uri=redirect_uri, + code_verifier=code_verifier, + ) + if creds.account_id: + object.__setattr__(self, "_oauth_account_id", creds.account_id) + return creds def _ensure_access_token(self) -> OpenAIOAuthCredentials: creds = self._oauth_manager.get_valid_credentials(skew_ms=self._oauth_refresh_skew_ms) diff --git a/mobilerun/cli/main.py b/mobilerun/cli/main.py index 00bb2d32..1ca6b0fa 100644 --- a/mobilerun/cli/main.py +++ b/mobilerun/cli/main.py @@ -1000,13 +1000,15 @@ def anthropic(): @click.option( "--token", default=None, - help="Anthropic setup-token value. If omitted, you will be prompted.", + help="Anthropic setup-token value. If provided, skips the OAuth flow.", ) def anthropic_login(credential_path: str, token: str | None): - """Save an Anthropic setup-token. This is the only supported Anthropic auth flow.""" - token_value = _prompt_anthropic_setup_token(token) - save_anthropic_setup_token(credential_path, token_value) - _print_oauth_login_success("Anthropic setup-token", credential_path) + """Login with Anthropic OAuth. Pass --token to save a setup-token without OAuth.""" + if token: + save_anthropic_setup_token(credential_path, token) + _print_oauth_login_success("Anthropic", credential_path) + else: + _run_anthropic_oauth_login(credential_path=credential_path) @anthropic.command("setup-token")