From 8d92a63626b037f282e72d4bac83eefc843e910f Mon Sep 17 00:00:00 2001 From: Game_Time <108236317+RayBytes@users.noreply.github.com> Date: Fri, 12 Sep 2025 21:53:59 +0500 Subject: [PATCH] allow for diff machine logins --- DOCKER.md | 3 +- chatmock/cli.py | 41 ++++++++++++ chatmock/oauth.py | 167 +++++++++++++++++++++++++++++++++------------- 3 files changed, 163 insertions(+), 48 deletions(-) diff --git a/DOCKER.md b/DOCKER.md index d3aae94..64f28d8 100644 --- a/DOCKER.md +++ b/DOCKER.md @@ -10,7 +10,8 @@ 3) Login: docker compose run --rm --service-ports chatmock-login login - The command prints an auth URL, copy paste it into your browser. - - Server should stop automatically once it recieves the tokens and they are saved. + - If your browser cannot reach the container's localhost callback, copy the full redirect URL from the browser address bar and paste it back into the terminal when prompted. + - Server should stop automatically once it receives the tokens and they are saved. 4) Start the server: docker compose up -d chatmock diff --git a/chatmock/cli.py b/chatmock/cli.py index 80e12dd..7daa0fa 100644 --- a/chatmock/cli.py +++ b/chatmock/cli.py @@ -39,6 +39,47 @@ def cmd_login(no_browser: bool, verbose: bool) -> int: except Exception as e: eprint(f"Failed to open browser: {e}") eprint(f"If your browser did not open, navigate to:\n{auth_url}") + + def _stdin_paste_worker() -> None: + try: + eprint( + "If the browser can't reach this machine, paste the full redirect URL here and press Enter (or leave blank to keep waiting):" + ) + line = sys.stdin.readline().strip() + if not line: + return + try: + from urllib.parse import urlparse, parse_qs + + parsed = urlparse(line) + params = parse_qs(parsed.query) + code = (params.get("code") or [None])[0] + state = (params.get("state") or [None])[0] + if not code: + eprint("Input did not contain an auth code. Ignoring.") + return + if state and state != httpd.state: + eprint("State mismatch. Ignoring pasted URL for safety.") + return + eprint("Received redirect URL. Completing login without callback…") + bundle, _ = httpd.exchange_code(code) + if httpd.persist_auth(bundle): + httpd.exit_code = 0 + eprint("Login successful. Tokens saved.") + else: + eprint("ERROR: Unable to persist auth file.") + httpd.shutdown() + except Exception as exc: + eprint(f"Failed to process pasted redirect URL: {exc}") + except Exception: + pass + + try: + import threading + + threading.Thread(target=_stdin_paste_worker, daemon=True).start() + except Exception: + pass try: httpd.serve_forever() except KeyboardInterrupt: diff --git a/chatmock/oauth.py b/chatmock/oauth.py index 9ba1eff..4461f5e 100644 --- a/chatmock/oauth.py +++ b/chatmock/oauth.py @@ -75,6 +75,125 @@ class OAuthHTTPServer(http.server.HTTPServer): } return f"{self.issuer}/oauth/authorize?" + urllib.parse.urlencode(params) + def exchange_code(self, code: str) -> tuple[AuthBundle, str]: + data = urllib.parse.urlencode( + { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": self.redirect_uri, + "client_id": self.client_id, + "code_verifier": self.pkce.code_verifier, + } + ).encode() + + with urllib.request.urlopen( + urllib.request.Request( + self.token_endpoint, + data=data, + method="POST", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ), + context=_SSL_CONTEXT, + ) as resp: + payload = json.loads(resp.read().decode()) + + id_token = payload.get("id_token", "") + access_token = payload.get("access_token", "") + refresh_token = payload.get("refresh_token", "") + + id_token_claims = parse_jwt_claims(id_token) + access_token_claims = parse_jwt_claims(access_token) + + auth_claims = (id_token_claims or {}).get("https://api.openai.com/auth", {}) + chatgpt_account_id = auth_claims.get("chatgpt_account_id", "") + + token_data = TokenData( + id_token=id_token, + access_token=access_token, + refresh_token=refresh_token, + account_id=chatgpt_account_id, + ) + + api_key, success_url = self.maybe_obtain_api_key( + id_token_claims or {}, access_token_claims or {}, token_data + ) + + last_refresh_str = ( + datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z") + ) + bundle = AuthBundle(api_key=api_key, token_data=token_data, last_refresh=last_refresh_str) + return bundle, success_url or f"{URL_BASE}/success" + + def maybe_obtain_api_key( + self, + token_claims: Dict[str, Any], + access_claims: Dict[str, Any], + token_data: TokenData, + ) -> tuple[str | None, str | None]: + org_id = token_claims.get("organization_id") + project_id = token_claims.get("project_id") + if not org_id or not project_id: + query = { + "id_token": token_data.id_token, + "needs_setup": "false", + "org_id": org_id or "", + "project_id": project_id or "", + "plan_type": access_claims.get("chatgpt_plan_type"), + "platform_url": "https://platform.openai.com", + } + return None, f"{URL_BASE}/success?{urllib.parse.urlencode(query)}" + + today = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d") + exchange_data = urllib.parse.urlencode( + { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "client_id": self.client_id, + "requested_token": "openai-api-key", + "subject_token": token_data.id_token, + "subject_token_type": "urn:ietf:params:oauth:token-type:id_token", + "name": f"ChatGPT Local [auto-generated] ({today})", + } + ).encode() + + with urllib.request.urlopen( + urllib.request.Request( + self.token_endpoint, + data=exchange_data, + method="POST", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ), + context=_SSL_CONTEXT, + ) as resp: + exchange_payload = json.loads(resp.read().decode()) + exchanged_access_token = exchange_payload.get("access_token") + + chatgpt_plan_type = access_claims.get("chatgpt_plan_type") + success_url_query = { + "id_token": token_data.id_token, + "access_token": token_data.access_token, + "refresh_token": token_data.refresh_token, + "exchanged_access_token": exchanged_access_token, + "org_id": org_id, + "project_id": project_id, + "plan_type": chatgpt_plan_type, + "platform_url": "https://platform.openai.com", + } + success_url = f"{URL_BASE}/success?{urllib.parse.urlencode(success_url_query)}" + return exchanged_access_token, success_url + + def persist_auth(self, bundle: AuthBundle) -> bool: + auth_json_contents = { + "OPENAI_API_KEY": bundle.api_key, + "tokens": { + "id_token": bundle.token_data.id_token, + "access_token": bundle.token_data.access_token, + "refresh_token": bundle.token_data.refresh_token, + "account_id": bundle.token_data.account_id, + }, + "last_refresh": bundle.last_refresh, + } + return write_auth_file(auth_json_contents) + class OAuthHandler(http.server.BaseHTTPRequestHandler): server: "OAuthHTTPServer" @@ -162,53 +281,7 @@ class OAuthHandler(http.server.BaseHTTPRequestHandler): threading.Thread(target=_later, daemon=True).start() def _exchange_code(self, code: str) -> Tuple[AuthBundle, str]: - data = urllib.parse.urlencode( - { - "grant_type": "authorization_code", - "code": code, - "redirect_uri": self.server.redirect_uri, - "client_id": self.server.client_id, - "code_verifier": self.server.pkce.code_verifier, - } - ).encode() - - with urllib.request.urlopen( - urllib.request.Request( - self.server.token_endpoint, - data=data, - method="POST", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - ), - context=_SSL_CONTEXT, - ) as resp: - payload = json.loads(resp.read().decode()) - - id_token = payload.get("id_token", "") - access_token = payload.get("access_token", "") - refresh_token = payload.get("refresh_token", "") - - id_token_claims = parse_jwt_claims(id_token) - access_token_claims = parse_jwt_claims(access_token) - - auth_claims = (id_token_claims or {}).get("https://api.openai.com/auth", {}) - chatgpt_account_id = auth_claims.get("chatgpt_account_id", "") - - token_data = TokenData( - id_token=id_token, - access_token=access_token, - refresh_token=refresh_token, - account_id=chatgpt_account_id, - ) - - api_key, success_url = self._maybe_obtain_api_key( - id_token_claims or {}, access_token_claims or {}, token_data - ) - - last_refresh_str = ( - datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z") - ) - bundle = AuthBundle(api_key=api_key, token_data=token_data, last_refresh=last_refresh_str) - return bundle, success_url or f"{URL_BASE}/success" + return self.server.exchange_code(code) def _maybe_obtain_api_key( self,