allow for diff machine logins
This commit is contained in:
@@ -10,7 +10,8 @@
|
|||||||
3) Login:
|
3) Login:
|
||||||
docker compose run --rm --service-ports chatmock-login login
|
docker compose run --rm --service-ports chatmock-login login
|
||||||
- The command prints an auth URL, copy paste it into your browser.
|
- The command prints an auth URL, copy paste it into your browser.
|
||||||
- Server should stop automatically once it recieves the tokens and they are saved.
|
- If your browser cannot reach the container's localhost callback, copy the full redirect URL from the browser address bar and paste it back into the terminal when prompted.
|
||||||
|
- Server should stop automatically once it receives the tokens and they are saved.
|
||||||
|
|
||||||
4) Start the server:
|
4) Start the server:
|
||||||
docker compose up -d chatmock
|
docker compose up -d chatmock
|
||||||
|
|||||||
@@ -39,6 +39,47 @@ def cmd_login(no_browser: bool, verbose: bool) -> int:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
eprint(f"Failed to open browser: {e}")
|
eprint(f"Failed to open browser: {e}")
|
||||||
eprint(f"If your browser did not open, navigate to:\n{auth_url}")
|
eprint(f"If your browser did not open, navigate to:\n{auth_url}")
|
||||||
|
|
||||||
|
def _stdin_paste_worker() -> None:
|
||||||
|
try:
|
||||||
|
eprint(
|
||||||
|
"If the browser can't reach this machine, paste the full redirect URL here and press Enter (or leave blank to keep waiting):"
|
||||||
|
)
|
||||||
|
line = sys.stdin.readline().strip()
|
||||||
|
if not line:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
from urllib.parse import urlparse, parse_qs
|
||||||
|
|
||||||
|
parsed = urlparse(line)
|
||||||
|
params = parse_qs(parsed.query)
|
||||||
|
code = (params.get("code") or [None])[0]
|
||||||
|
state = (params.get("state") or [None])[0]
|
||||||
|
if not code:
|
||||||
|
eprint("Input did not contain an auth code. Ignoring.")
|
||||||
|
return
|
||||||
|
if state and state != httpd.state:
|
||||||
|
eprint("State mismatch. Ignoring pasted URL for safety.")
|
||||||
|
return
|
||||||
|
eprint("Received redirect URL. Completing login without callback…")
|
||||||
|
bundle, _ = httpd.exchange_code(code)
|
||||||
|
if httpd.persist_auth(bundle):
|
||||||
|
httpd.exit_code = 0
|
||||||
|
eprint("Login successful. Tokens saved.")
|
||||||
|
else:
|
||||||
|
eprint("ERROR: Unable to persist auth file.")
|
||||||
|
httpd.shutdown()
|
||||||
|
except Exception as exc:
|
||||||
|
eprint(f"Failed to process pasted redirect URL: {exc}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
import threading
|
||||||
|
|
||||||
|
threading.Thread(target=_stdin_paste_worker, daemon=True).start()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
try:
|
try:
|
||||||
httpd.serve_forever()
|
httpd.serve_forever()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
|||||||
@@ -75,6 +75,125 @@ class OAuthHTTPServer(http.server.HTTPServer):
|
|||||||
}
|
}
|
||||||
return f"{self.issuer}/oauth/authorize?" + urllib.parse.urlencode(params)
|
return f"{self.issuer}/oauth/authorize?" + urllib.parse.urlencode(params)
|
||||||
|
|
||||||
|
def exchange_code(self, code: str) -> tuple[AuthBundle, str]:
|
||||||
|
data = urllib.parse.urlencode(
|
||||||
|
{
|
||||||
|
"grant_type": "authorization_code",
|
||||||
|
"code": code,
|
||||||
|
"redirect_uri": self.redirect_uri,
|
||||||
|
"client_id": self.client_id,
|
||||||
|
"code_verifier": self.pkce.code_verifier,
|
||||||
|
}
|
||||||
|
).encode()
|
||||||
|
|
||||||
|
with urllib.request.urlopen(
|
||||||
|
urllib.request.Request(
|
||||||
|
self.token_endpoint,
|
||||||
|
data=data,
|
||||||
|
method="POST",
|
||||||
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||||
|
),
|
||||||
|
context=_SSL_CONTEXT,
|
||||||
|
) as resp:
|
||||||
|
payload = json.loads(resp.read().decode())
|
||||||
|
|
||||||
|
id_token = payload.get("id_token", "")
|
||||||
|
access_token = payload.get("access_token", "")
|
||||||
|
refresh_token = payload.get("refresh_token", "")
|
||||||
|
|
||||||
|
id_token_claims = parse_jwt_claims(id_token)
|
||||||
|
access_token_claims = parse_jwt_claims(access_token)
|
||||||
|
|
||||||
|
auth_claims = (id_token_claims or {}).get("https://api.openai.com/auth", {})
|
||||||
|
chatgpt_account_id = auth_claims.get("chatgpt_account_id", "")
|
||||||
|
|
||||||
|
token_data = TokenData(
|
||||||
|
id_token=id_token,
|
||||||
|
access_token=access_token,
|
||||||
|
refresh_token=refresh_token,
|
||||||
|
account_id=chatgpt_account_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
api_key, success_url = self.maybe_obtain_api_key(
|
||||||
|
id_token_claims or {}, access_token_claims or {}, token_data
|
||||||
|
)
|
||||||
|
|
||||||
|
last_refresh_str = (
|
||||||
|
datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
|
||||||
|
)
|
||||||
|
bundle = AuthBundle(api_key=api_key, token_data=token_data, last_refresh=last_refresh_str)
|
||||||
|
return bundle, success_url or f"{URL_BASE}/success"
|
||||||
|
|
||||||
|
def maybe_obtain_api_key(
|
||||||
|
self,
|
||||||
|
token_claims: Dict[str, Any],
|
||||||
|
access_claims: Dict[str, Any],
|
||||||
|
token_data: TokenData,
|
||||||
|
) -> tuple[str | None, str | None]:
|
||||||
|
org_id = token_claims.get("organization_id")
|
||||||
|
project_id = token_claims.get("project_id")
|
||||||
|
if not org_id or not project_id:
|
||||||
|
query = {
|
||||||
|
"id_token": token_data.id_token,
|
||||||
|
"needs_setup": "false",
|
||||||
|
"org_id": org_id or "",
|
||||||
|
"project_id": project_id or "",
|
||||||
|
"plan_type": access_claims.get("chatgpt_plan_type"),
|
||||||
|
"platform_url": "https://platform.openai.com",
|
||||||
|
}
|
||||||
|
return None, f"{URL_BASE}/success?{urllib.parse.urlencode(query)}"
|
||||||
|
|
||||||
|
today = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")
|
||||||
|
exchange_data = urllib.parse.urlencode(
|
||||||
|
{
|
||||||
|
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
|
||||||
|
"client_id": self.client_id,
|
||||||
|
"requested_token": "openai-api-key",
|
||||||
|
"subject_token": token_data.id_token,
|
||||||
|
"subject_token_type": "urn:ietf:params:oauth:token-type:id_token",
|
||||||
|
"name": f"ChatGPT Local [auto-generated] ({today})",
|
||||||
|
}
|
||||||
|
).encode()
|
||||||
|
|
||||||
|
with urllib.request.urlopen(
|
||||||
|
urllib.request.Request(
|
||||||
|
self.token_endpoint,
|
||||||
|
data=exchange_data,
|
||||||
|
method="POST",
|
||||||
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||||
|
),
|
||||||
|
context=_SSL_CONTEXT,
|
||||||
|
) as resp:
|
||||||
|
exchange_payload = json.loads(resp.read().decode())
|
||||||
|
exchanged_access_token = exchange_payload.get("access_token")
|
||||||
|
|
||||||
|
chatgpt_plan_type = access_claims.get("chatgpt_plan_type")
|
||||||
|
success_url_query = {
|
||||||
|
"id_token": token_data.id_token,
|
||||||
|
"access_token": token_data.access_token,
|
||||||
|
"refresh_token": token_data.refresh_token,
|
||||||
|
"exchanged_access_token": exchanged_access_token,
|
||||||
|
"org_id": org_id,
|
||||||
|
"project_id": project_id,
|
||||||
|
"plan_type": chatgpt_plan_type,
|
||||||
|
"platform_url": "https://platform.openai.com",
|
||||||
|
}
|
||||||
|
success_url = f"{URL_BASE}/success?{urllib.parse.urlencode(success_url_query)}"
|
||||||
|
return exchanged_access_token, success_url
|
||||||
|
|
||||||
|
def persist_auth(self, bundle: AuthBundle) -> bool:
|
||||||
|
auth_json_contents = {
|
||||||
|
"OPENAI_API_KEY": bundle.api_key,
|
||||||
|
"tokens": {
|
||||||
|
"id_token": bundle.token_data.id_token,
|
||||||
|
"access_token": bundle.token_data.access_token,
|
||||||
|
"refresh_token": bundle.token_data.refresh_token,
|
||||||
|
"account_id": bundle.token_data.account_id,
|
||||||
|
},
|
||||||
|
"last_refresh": bundle.last_refresh,
|
||||||
|
}
|
||||||
|
return write_auth_file(auth_json_contents)
|
||||||
|
|
||||||
|
|
||||||
class OAuthHandler(http.server.BaseHTTPRequestHandler):
|
class OAuthHandler(http.server.BaseHTTPRequestHandler):
|
||||||
server: "OAuthHTTPServer"
|
server: "OAuthHTTPServer"
|
||||||
@@ -162,53 +281,7 @@ class OAuthHandler(http.server.BaseHTTPRequestHandler):
|
|||||||
threading.Thread(target=_later, daemon=True).start()
|
threading.Thread(target=_later, daemon=True).start()
|
||||||
|
|
||||||
def _exchange_code(self, code: str) -> Tuple[AuthBundle, str]:
|
def _exchange_code(self, code: str) -> Tuple[AuthBundle, str]:
|
||||||
data = urllib.parse.urlencode(
|
return self.server.exchange_code(code)
|
||||||
{
|
|
||||||
"grant_type": "authorization_code",
|
|
||||||
"code": code,
|
|
||||||
"redirect_uri": self.server.redirect_uri,
|
|
||||||
"client_id": self.server.client_id,
|
|
||||||
"code_verifier": self.server.pkce.code_verifier,
|
|
||||||
}
|
|
||||||
).encode()
|
|
||||||
|
|
||||||
with urllib.request.urlopen(
|
|
||||||
urllib.request.Request(
|
|
||||||
self.server.token_endpoint,
|
|
||||||
data=data,
|
|
||||||
method="POST",
|
|
||||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
||||||
),
|
|
||||||
context=_SSL_CONTEXT,
|
|
||||||
) as resp:
|
|
||||||
payload = json.loads(resp.read().decode())
|
|
||||||
|
|
||||||
id_token = payload.get("id_token", "")
|
|
||||||
access_token = payload.get("access_token", "")
|
|
||||||
refresh_token = payload.get("refresh_token", "")
|
|
||||||
|
|
||||||
id_token_claims = parse_jwt_claims(id_token)
|
|
||||||
access_token_claims = parse_jwt_claims(access_token)
|
|
||||||
|
|
||||||
auth_claims = (id_token_claims or {}).get("https://api.openai.com/auth", {})
|
|
||||||
chatgpt_account_id = auth_claims.get("chatgpt_account_id", "")
|
|
||||||
|
|
||||||
token_data = TokenData(
|
|
||||||
id_token=id_token,
|
|
||||||
access_token=access_token,
|
|
||||||
refresh_token=refresh_token,
|
|
||||||
account_id=chatgpt_account_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
api_key, success_url = self._maybe_obtain_api_key(
|
|
||||||
id_token_claims or {}, access_token_claims or {}, token_data
|
|
||||||
)
|
|
||||||
|
|
||||||
last_refresh_str = (
|
|
||||||
datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
|
|
||||||
)
|
|
||||||
bundle = AuthBundle(api_key=api_key, token_data=token_data, last_refresh=last_refresh_str)
|
|
||||||
return bundle, success_url or f"{URL_BASE}/success"
|
|
||||||
|
|
||||||
def _maybe_obtain_api_key(
|
def _maybe_obtain_api_key(
|
||||||
self,
|
self,
|
||||||
|
|||||||
Reference in New Issue
Block a user