875 lines
37 KiB
Python
875 lines
37 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import datetime
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import secrets
|
|
import sys
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import requests
|
|
|
|
from .config import CLIENT_ID_DEFAULT, OAUTH_TOKEN_URL
|
|
|
|
|
|
def eprint(*args, **kwargs) -> None:
|
|
print(*args, file=sys.stderr, **kwargs)
|
|
|
|
|
|
def get_home_dir() -> str:
|
|
home = os.getenv("CHATGPT_LOCAL_HOME") or os.getenv("CODEX_HOME")
|
|
if not home:
|
|
home = os.path.expanduser("~/.chatgpt-local")
|
|
return home
|
|
|
|
|
|
def read_auth_file() -> Dict[str, Any] | None:
|
|
for base in [
|
|
os.getenv("CHATGPT_LOCAL_HOME"),
|
|
os.getenv("CODEX_HOME"),
|
|
os.path.expanduser("~/.chatgpt-local"),
|
|
os.path.expanduser("~/.codex"),
|
|
]:
|
|
if not base:
|
|
continue
|
|
path = os.path.join(base, "auth.json")
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
return None
|
|
|
|
|
|
def write_auth_file(auth: Dict[str, Any]) -> bool:
|
|
home = get_home_dir()
|
|
try:
|
|
os.makedirs(home, exist_ok=True)
|
|
except Exception as exc:
|
|
eprint(f"ERROR: unable to create auth home directory {home}: {exc}")
|
|
return False
|
|
path = os.path.join(home, "auth.json")
|
|
try:
|
|
with open(path, "w", encoding="utf-8") as fp:
|
|
if hasattr(os, "fchmod"):
|
|
os.fchmod(fp.fileno(), 0o600)
|
|
json.dump(auth, fp, indent=2)
|
|
return True
|
|
except Exception as exc:
|
|
eprint(f"ERROR: unable to write auth file: {exc}")
|
|
return False
|
|
|
|
|
|
def parse_jwt_claims(token: str) -> Dict[str, Any] | None:
|
|
if not token or token.count(".") != 2:
|
|
return None
|
|
try:
|
|
_, payload, _ = token.split(".")
|
|
padded = payload + "=" * (-len(payload) % 4)
|
|
data = base64.urlsafe_b64decode(padded.encode())
|
|
return json.loads(data.decode())
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def generate_pkce() -> "PkceCodes":
|
|
from .models import PkceCodes
|
|
|
|
code_verifier = secrets.token_hex(64)
|
|
digest = hashlib.sha256(code_verifier.encode()).digest()
|
|
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode()
|
|
return PkceCodes(code_verifier=code_verifier, code_challenge=code_challenge)
|
|
|
|
|
|
def convert_chat_messages_to_responses_input(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
def _normalize_image_data_url(url: str) -> str:
|
|
try:
|
|
if not isinstance(url, str):
|
|
return url
|
|
if not url.startswith("data:image/"):
|
|
return url
|
|
if ";base64," not in url:
|
|
return url
|
|
header, data = url.split(",", 1)
|
|
try:
|
|
from urllib.parse import unquote
|
|
|
|
data = unquote(data)
|
|
except Exception:
|
|
pass
|
|
data = data.strip().replace("\n", "").replace("\r", "")
|
|
data = data.replace("-", "+").replace("_", "/")
|
|
pad = (-len(data)) % 4
|
|
if pad:
|
|
data = data + ("=" * pad)
|
|
try:
|
|
base64.b64decode(data, validate=True)
|
|
except Exception:
|
|
return url
|
|
return f"{header},{data}"
|
|
except Exception:
|
|
return url
|
|
|
|
input_items: List[Dict[str, Any]] = []
|
|
for message in messages:
|
|
role = message.get("role")
|
|
if role == "system":
|
|
continue
|
|
|
|
if role == "tool":
|
|
call_id = message.get("tool_call_id") or message.get("id")
|
|
if isinstance(call_id, str) and call_id:
|
|
content = message.get("content", "")
|
|
if isinstance(content, list):
|
|
texts = []
|
|
for part in content:
|
|
if isinstance(part, dict):
|
|
t = part.get("text") or part.get("content")
|
|
if isinstance(t, str) and t:
|
|
texts.append(t)
|
|
content = "\n".join(texts)
|
|
if isinstance(content, str):
|
|
input_items.append(
|
|
{
|
|
"type": "function_call_output",
|
|
"call_id": call_id,
|
|
"output": content,
|
|
}
|
|
)
|
|
continue
|
|
if role == "assistant" and isinstance(message.get("tool_calls"), list):
|
|
for tc in message.get("tool_calls") or []:
|
|
if not isinstance(tc, dict):
|
|
continue
|
|
tc_type = tc.get("type", "function")
|
|
if tc_type != "function":
|
|
continue
|
|
call_id = tc.get("id") or tc.get("call_id")
|
|
fn = tc.get("function") if isinstance(tc.get("function"), dict) else {}
|
|
name = fn.get("name") if isinstance(fn, dict) else None
|
|
args = fn.get("arguments") if isinstance(fn, dict) else None
|
|
if isinstance(call_id, str) and isinstance(name, str) and isinstance(args, str):
|
|
input_items.append(
|
|
{
|
|
"type": "function_call",
|
|
"name": name,
|
|
"arguments": args,
|
|
"call_id": call_id,
|
|
}
|
|
)
|
|
|
|
content = message.get("content", "")
|
|
content_items: List[Dict[str, Any]] = []
|
|
if isinstance(content, list):
|
|
for part in content:
|
|
if not isinstance(part, dict):
|
|
continue
|
|
ptype = part.get("type")
|
|
if ptype == "text":
|
|
text = part.get("text") or part.get("content") or ""
|
|
if isinstance(text, str) and text:
|
|
kind = "output_text" if role == "assistant" else "input_text"
|
|
content_items.append({"type": kind, "text": text})
|
|
elif ptype == "image_url":
|
|
image = part.get("image_url")
|
|
url = image.get("url") if isinstance(image, dict) else image
|
|
if isinstance(url, str) and url:
|
|
content_items.append({"type": "input_image", "image_url": _normalize_image_data_url(url)})
|
|
elif isinstance(content, str) and content:
|
|
kind = "output_text" if role == "assistant" else "input_text"
|
|
content_items.append({"type": kind, "text": content})
|
|
|
|
if not content_items:
|
|
continue
|
|
role_out = "assistant" if role == "assistant" else "user"
|
|
input_items.append({"type": "message", "role": role_out, "content": content_items})
|
|
return input_items
|
|
|
|
|
|
def convert_tools_chat_to_responses(tools: Any) -> List[Dict[str, Any]]:
|
|
out: List[Dict[str, Any]] = []
|
|
if not isinstance(tools, list):
|
|
return out
|
|
for t in tools:
|
|
if not isinstance(t, dict):
|
|
continue
|
|
if t.get("type") != "function":
|
|
continue
|
|
fn = t.get("function") if isinstance(t.get("function"), dict) else {}
|
|
name = fn.get("name") if isinstance(fn, dict) else None
|
|
if not isinstance(name, str) or not name:
|
|
continue
|
|
desc = fn.get("description") if isinstance(fn, dict) else None
|
|
params = fn.get("parameters") if isinstance(fn, dict) else None
|
|
if not isinstance(params, dict):
|
|
params = {"type": "object", "properties": {}}
|
|
out.append(
|
|
{
|
|
"type": "function",
|
|
"name": name,
|
|
"description": desc or "",
|
|
"strict": False,
|
|
"parameters": params,
|
|
}
|
|
)
|
|
return out
|
|
|
|
|
|
def load_chatgpt_tokens(ensure_fresh: bool = True) -> tuple[str | None, str | None, str | None]:
|
|
auth = read_auth_file()
|
|
if not isinstance(auth, dict):
|
|
return None, None, None
|
|
|
|
tokens = auth.get("tokens") if isinstance(auth.get("tokens"), dict) else {}
|
|
access_token: Optional[str] = tokens.get("access_token")
|
|
account_id: Optional[str] = tokens.get("account_id")
|
|
id_token: Optional[str] = tokens.get("id_token")
|
|
refresh_token: Optional[str] = tokens.get("refresh_token")
|
|
last_refresh = auth.get("last_refresh")
|
|
|
|
if ensure_fresh and isinstance(refresh_token, str) and refresh_token and CLIENT_ID_DEFAULT:
|
|
needs_refresh = _should_refresh_access_token(access_token, last_refresh)
|
|
if needs_refresh or not (isinstance(access_token, str) and access_token):
|
|
refreshed = _refresh_chatgpt_tokens(refresh_token, CLIENT_ID_DEFAULT)
|
|
if refreshed:
|
|
access_token = refreshed.get("access_token") or access_token
|
|
id_token = refreshed.get("id_token") or id_token
|
|
refresh_token = refreshed.get("refresh_token") or refresh_token
|
|
account_id = refreshed.get("account_id") or account_id
|
|
|
|
updated_tokens = dict(tokens)
|
|
if isinstance(access_token, str) and access_token:
|
|
updated_tokens["access_token"] = access_token
|
|
if isinstance(id_token, str) and id_token:
|
|
updated_tokens["id_token"] = id_token
|
|
if isinstance(refresh_token, str) and refresh_token:
|
|
updated_tokens["refresh_token"] = refresh_token
|
|
if isinstance(account_id, str) and account_id:
|
|
updated_tokens["account_id"] = account_id
|
|
|
|
persisted = _persist_refreshed_auth(auth, updated_tokens)
|
|
if persisted is not None:
|
|
auth, tokens = persisted
|
|
else:
|
|
tokens = updated_tokens
|
|
|
|
if not isinstance(account_id, str) or not account_id:
|
|
account_id = _derive_account_id(id_token)
|
|
|
|
access_token = access_token if isinstance(access_token, str) and access_token else None
|
|
id_token = id_token if isinstance(id_token, str) and id_token else None
|
|
account_id = account_id if isinstance(account_id, str) and account_id else None
|
|
return access_token, account_id, id_token
|
|
|
|
|
|
def _should_refresh_access_token(access_token: Optional[str], last_refresh: Any) -> bool:
|
|
if not isinstance(access_token, str) or not access_token:
|
|
return True
|
|
|
|
claims = parse_jwt_claims(access_token) or {}
|
|
exp = claims.get("exp") if isinstance(claims, dict) else None
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
if isinstance(exp, (int, float)):
|
|
try:
|
|
expiry = datetime.datetime.fromtimestamp(float(exp), datetime.timezone.utc)
|
|
except (OverflowError, OSError, ValueError):
|
|
expiry = None
|
|
if expiry is not None:
|
|
return expiry <= now + datetime.timedelta(minutes=5)
|
|
|
|
if isinstance(last_refresh, str):
|
|
refreshed_at = _parse_iso8601(last_refresh)
|
|
if refreshed_at is not None:
|
|
return refreshed_at <= now - datetime.timedelta(minutes=55)
|
|
return False
|
|
|
|
|
|
def _refresh_chatgpt_tokens(refresh_token: str, client_id: str) -> Optional[Dict[str, Optional[str]]]:
|
|
payload = {
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": refresh_token,
|
|
"client_id": client_id,
|
|
"scope": "openid profile email offline_access",
|
|
}
|
|
|
|
try:
|
|
resp = requests.post(OAUTH_TOKEN_URL, json=payload, timeout=30)
|
|
except requests.RequestException as exc:
|
|
eprint(f"ERROR: failed to refresh ChatGPT token: {exc}")
|
|
return None
|
|
|
|
if resp.status_code >= 400:
|
|
eprint(f"ERROR: refresh token request returned status {resp.status_code}")
|
|
return None
|
|
|
|
try:
|
|
data = resp.json()
|
|
except ValueError as exc:
|
|
eprint(f"ERROR: unable to parse refresh token response: {exc}")
|
|
return None
|
|
|
|
id_token = data.get("id_token")
|
|
access_token = data.get("access_token")
|
|
new_refresh_token = data.get("refresh_token") or refresh_token
|
|
if not isinstance(id_token, str) or not isinstance(access_token, str):
|
|
eprint("ERROR: refresh token response missing expected tokens")
|
|
return None
|
|
|
|
account_id = _derive_account_id(id_token)
|
|
new_refresh_token = new_refresh_token if isinstance(new_refresh_token, str) and new_refresh_token else refresh_token
|
|
return {
|
|
"id_token": id_token,
|
|
"access_token": access_token,
|
|
"refresh_token": new_refresh_token,
|
|
"account_id": account_id,
|
|
}
|
|
|
|
|
|
def _persist_refreshed_auth(auth: Dict[str, Any], updated_tokens: Dict[str, Any]) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
|
|
updated_auth = dict(auth)
|
|
updated_auth["tokens"] = updated_tokens
|
|
updated_auth["last_refresh"] = _now_iso8601()
|
|
if write_auth_file(updated_auth):
|
|
return updated_auth, updated_tokens
|
|
eprint("ERROR: unable to persist refreshed auth tokens")
|
|
return None
|
|
|
|
|
|
def _derive_account_id(id_token: Optional[str]) -> Optional[str]:
|
|
if not isinstance(id_token, str) or not id_token:
|
|
return None
|
|
claims = parse_jwt_claims(id_token) or {}
|
|
auth_claims = claims.get("https://api.openai.com/auth") if isinstance(claims, dict) else None
|
|
if isinstance(auth_claims, dict):
|
|
account_id = auth_claims.get("chatgpt_account_id")
|
|
if isinstance(account_id, str) and account_id:
|
|
return account_id
|
|
return None
|
|
|
|
|
|
def _parse_iso8601(value: str) -> Optional[datetime.datetime]:
|
|
try:
|
|
if value.endswith("Z"):
|
|
value = value[:-1] + "+00:00"
|
|
dt = datetime.datetime.fromisoformat(value)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=datetime.timezone.utc)
|
|
return dt.astimezone(datetime.timezone.utc)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _now_iso8601() -> str:
|
|
return datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def get_effective_chatgpt_auth() -> tuple[str | None, str | None]:
|
|
access_token, account_id, id_token = load_chatgpt_tokens()
|
|
if not account_id:
|
|
account_id = _derive_account_id(id_token)
|
|
return access_token, account_id
|
|
|
|
|
|
def sse_translate_chat(
|
|
upstream,
|
|
model: str,
|
|
created: int,
|
|
verbose: bool = False,
|
|
vlog=None,
|
|
reasoning_compat: str = "think-tags",
|
|
*,
|
|
include_usage: bool = False,
|
|
):
|
|
response_id = "chatcmpl-stream"
|
|
compat = (reasoning_compat or "think-tags").strip().lower()
|
|
think_open = False
|
|
think_closed = False
|
|
saw_output = False
|
|
sent_stop_chunk = False
|
|
saw_any_summary = False
|
|
pending_summary_paragraph = False
|
|
upstream_usage = None
|
|
ws_state: dict[str, Any] = {}
|
|
ws_index: dict[str, int] = {}
|
|
ws_next_index: int = 0
|
|
|
|
def _serialize_tool_args(eff_args: Any) -> str:
|
|
"""
|
|
Serialize tool call arguments with proper JSON handling.
|
|
|
|
Args:
|
|
eff_args: Arguments to serialize (dict, list, str, or other)
|
|
|
|
Returns:
|
|
JSON string representation of the arguments
|
|
"""
|
|
if isinstance(eff_args, (dict, list)):
|
|
return json.dumps(eff_args)
|
|
elif isinstance(eff_args, str):
|
|
try:
|
|
parsed = json.loads(eff_args)
|
|
if isinstance(parsed, (dict, list)):
|
|
return json.dumps(parsed)
|
|
else:
|
|
return json.dumps({"query": eff_args})
|
|
except (json.JSONDecodeError, ValueError):
|
|
return json.dumps({"query": eff_args})
|
|
else:
|
|
return "{}"
|
|
|
|
def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
|
|
try:
|
|
usage = (evt.get("response") or {}).get("usage")
|
|
if not isinstance(usage, dict):
|
|
return None
|
|
pt = int(usage.get("input_tokens") or 0)
|
|
ct = int(usage.get("output_tokens") or 0)
|
|
tt = int(usage.get("total_tokens") or (pt + ct))
|
|
return {"prompt_tokens": pt, "completion_tokens": ct, "total_tokens": tt}
|
|
except Exception:
|
|
return None
|
|
try:
|
|
try:
|
|
line_iterator = upstream.iter_lines(decode_unicode=False)
|
|
except requests.exceptions.ChunkedEncodingError as e:
|
|
if verbose and vlog:
|
|
vlog(f"Failed to start stream: {e}")
|
|
yield b"data: [DONE]\n\n"
|
|
return
|
|
|
|
for raw in line_iterator:
|
|
try:
|
|
if not raw:
|
|
continue
|
|
line = (
|
|
raw.decode("utf-8", errors="ignore")
|
|
if isinstance(raw, (bytes, bytearray))
|
|
else raw
|
|
)
|
|
if verbose and vlog:
|
|
vlog(line)
|
|
if not line.startswith("data: "):
|
|
continue
|
|
data = line[len("data: ") :].strip()
|
|
if not data:
|
|
continue
|
|
if data == "[DONE]":
|
|
break
|
|
try:
|
|
evt = json.loads(data)
|
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
|
continue
|
|
except (
|
|
requests.exceptions.ChunkedEncodingError,
|
|
ConnectionError,
|
|
BrokenPipeError,
|
|
) as e:
|
|
# Connection interrupted mid-stream - end gracefully
|
|
if verbose and vlog:
|
|
vlog(f"Stream interrupted: {e}")
|
|
yield b"data: [DONE]\n\n"
|
|
return
|
|
kind = evt.get("type")
|
|
if isinstance(evt.get("response"), dict) and isinstance(evt["response"].get("id"), str):
|
|
response_id = evt["response"].get("id") or response_id
|
|
|
|
if isinstance(kind, str) and ("web_search_call" in kind):
|
|
try:
|
|
call_id = evt.get("item_id") or "ws_call"
|
|
if verbose and vlog:
|
|
try:
|
|
vlog(f"CM_TOOLS {kind} id={call_id} -> tool_calls(web_search)")
|
|
except Exception:
|
|
pass
|
|
item = evt.get('item') if isinstance(evt.get('item'), dict) else {}
|
|
params_dict = ws_state.setdefault(call_id, {}) if isinstance(ws_state.get(call_id), dict) else {}
|
|
def _merge_from(src):
|
|
if not isinstance(src, dict):
|
|
return
|
|
for whole in ('parameters','args','arguments','input'):
|
|
if isinstance(src.get(whole), dict):
|
|
params_dict.update(src.get(whole))
|
|
if isinstance(src.get('query'), str): params_dict.setdefault('query', src.get('query'))
|
|
if isinstance(src.get('q'), str): params_dict.setdefault('query', src.get('q'))
|
|
for rk in ('recency','time_range','days'):
|
|
if src.get(rk) is not None and rk not in params_dict: params_dict[rk] = src.get(rk)
|
|
for dk in ('domains','include_domains','include'):
|
|
if isinstance(src.get(dk), list) and 'domains' not in params_dict: params_dict['domains'] = src.get(dk)
|
|
for mk in ('max_results','topn','limit'):
|
|
if src.get(mk) is not None and 'max_results' not in params_dict: params_dict['max_results'] = src.get(mk)
|
|
_merge_from(item)
|
|
_merge_from(evt if isinstance(evt, dict) else None)
|
|
params = params_dict if params_dict else None
|
|
if isinstance(params, dict):
|
|
try:
|
|
ws_state.setdefault(call_id, {}).update(params)
|
|
except Exception:
|
|
pass
|
|
eff_params = ws_state.get(call_id, params if isinstance(params, (dict, list, str)) else {})
|
|
args_str = _serialize_tool_args(eff_params)
|
|
if call_id not in ws_index:
|
|
ws_index[call_id] = ws_next_index
|
|
ws_next_index += 1
|
|
_idx = ws_index.get(call_id, 0)
|
|
delta_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {
|
|
"tool_calls": [
|
|
{
|
|
"index": _idx,
|
|
"id": call_id,
|
|
"type": "function",
|
|
"function": {"name": "web_search", "arguments": args_str},
|
|
}
|
|
]
|
|
},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(delta_chunk)}\n\n".encode("utf-8")
|
|
if kind.endswith(".completed") or kind.endswith(".done"):
|
|
finish_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{"index": 0, "delta": {}, "finish_reason": "tool_calls"}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(finish_chunk)}\n\n".encode("utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
if kind == "response.output_text.delta":
|
|
delta = evt.get("delta") or ""
|
|
if compat == "think-tags" and think_open and not think_closed:
|
|
close_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "delta": {"content": "</think>"}, "finish_reason": None}],
|
|
}
|
|
yield f"data: {json.dumps(close_chunk)}\n\n".encode("utf-8")
|
|
think_open = False
|
|
think_closed = True
|
|
saw_output = True
|
|
chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "delta": {"content": delta}, "finish_reason": None}],
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
|
|
elif kind == "response.output_item.done":
|
|
item = evt.get("item") or {}
|
|
if isinstance(item, dict) and (item.get("type") == "function_call" or item.get("type") == "web_search_call"):
|
|
call_id = item.get("call_id") or item.get("id") or ""
|
|
name = item.get("name") or ("web_search" if item.get("type") == "web_search_call" else "")
|
|
raw_args = item.get("arguments") or item.get("parameters")
|
|
if isinstance(raw_args, dict):
|
|
try:
|
|
ws_state.setdefault(call_id, {}).update(raw_args)
|
|
except Exception:
|
|
pass
|
|
eff_args = ws_state.get(call_id, raw_args if isinstance(raw_args, (dict, list, str)) else {})
|
|
try:
|
|
args = _serialize_tool_args(eff_args)
|
|
except Exception:
|
|
args = "{}"
|
|
if item.get("type") == "web_search_call" and verbose and vlog:
|
|
try:
|
|
vlog(f"CM_TOOLS response.output_item.done web_search_call id={call_id} has_args={bool(args)}")
|
|
except Exception:
|
|
pass
|
|
if call_id not in ws_index:
|
|
ws_index[call_id] = ws_next_index
|
|
ws_next_index += 1
|
|
_idx = ws_index.get(call_id, 0)
|
|
if isinstance(call_id, str) and isinstance(name, str) and isinstance(args, str):
|
|
delta_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {
|
|
"tool_calls": [
|
|
{
|
|
"index": _idx,
|
|
"id": call_id,
|
|
"type": "function",
|
|
"function": {"name": name, "arguments": args},
|
|
}
|
|
]
|
|
},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(delta_chunk)}\n\n".encode("utf-8")
|
|
|
|
finish_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": "tool_calls"}],
|
|
}
|
|
yield f"data: {json.dumps(finish_chunk)}\n\n".encode("utf-8")
|
|
elif kind == "response.reasoning_summary_part.added":
|
|
if compat in ("think-tags", "o3"):
|
|
if saw_any_summary:
|
|
pending_summary_paragraph = True
|
|
else:
|
|
saw_any_summary = True
|
|
elif kind in ("response.reasoning_summary_text.delta", "response.reasoning_text.delta"):
|
|
delta_txt = evt.get("delta") or ""
|
|
if compat == "o3":
|
|
if kind == "response.reasoning_summary_text.delta" and pending_summary_paragraph:
|
|
nl_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {"reasoning": {"content": [{"type": "text", "text": "\n"}]}},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(nl_chunk)}\n\n".encode("utf-8")
|
|
pending_summary_paragraph = False
|
|
chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {"reasoning": {"content": [{"type": "text", "text": delta_txt}]}},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
|
|
elif compat == "think-tags":
|
|
if not think_open and not think_closed:
|
|
open_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "delta": {"content": "<think>"}, "finish_reason": None}],
|
|
}
|
|
yield f"data: {json.dumps(open_chunk)}\n\n".encode("utf-8")
|
|
think_open = True
|
|
if think_open and not think_closed:
|
|
if kind == "response.reasoning_summary_text.delta" and pending_summary_paragraph:
|
|
nl_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "delta": {"content": "\n"}, "finish_reason": None}],
|
|
}
|
|
yield f"data: {json.dumps(nl_chunk)}\n\n".encode("utf-8")
|
|
pending_summary_paragraph = False
|
|
content_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "delta": {"content": delta_txt}, "finish_reason": None}],
|
|
}
|
|
yield f"data: {json.dumps(content_chunk)}\n\n".encode("utf-8")
|
|
else:
|
|
if kind == "response.reasoning_summary_text.delta":
|
|
chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {"reasoning_summary": delta_txt, "reasoning": delta_txt},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
|
|
else:
|
|
chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{"index": 0, "delta": {"reasoning": delta_txt}, "finish_reason": None}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
|
|
elif isinstance(kind, str) and kind.endswith(".done"):
|
|
pass
|
|
elif kind == "response.output_text.done":
|
|
chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
|
|
sent_stop_chunk = True
|
|
elif kind == "response.failed":
|
|
err = evt.get("response", {}).get("error", {}).get("message", "response.failed")
|
|
chunk = {"error": {"message": err}}
|
|
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
|
|
elif kind == "response.completed":
|
|
m = _extract_usage(evt)
|
|
if m:
|
|
upstream_usage = m
|
|
if compat == "think-tags" and think_open and not think_closed:
|
|
close_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "delta": {"content": "</think>"}, "finish_reason": None}],
|
|
}
|
|
yield f"data: {json.dumps(close_chunk)}\n\n".encode("utf-8")
|
|
think_open = False
|
|
think_closed = True
|
|
if not sent_stop_chunk:
|
|
chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
|
|
sent_stop_chunk = True
|
|
|
|
if include_usage and upstream_usage:
|
|
try:
|
|
usage_chunk = {
|
|
"id": response_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": None}],
|
|
"usage": upstream_usage,
|
|
}
|
|
yield f"data: {json.dumps(usage_chunk)}\n\n".encode("utf-8")
|
|
except Exception:
|
|
pass
|
|
yield b"data: [DONE]\n\n"
|
|
break
|
|
finally:
|
|
upstream.close()
|
|
|
|
|
|
def sse_translate_text(upstream, model: str, created: int, verbose: bool = False, vlog=None, *, include_usage: bool = False):
|
|
response_id = "cmpl-stream"
|
|
upstream_usage = None
|
|
|
|
def _extract_usage(evt: Dict[str, Any]) -> Dict[str, int] | None:
|
|
try:
|
|
usage = (evt.get("response") or {}).get("usage")
|
|
if not isinstance(usage, dict):
|
|
return None
|
|
pt = int(usage.get("input_tokens") or 0)
|
|
ct = int(usage.get("output_tokens") or 0)
|
|
tt = int(usage.get("total_tokens") or (pt + ct))
|
|
return {"prompt_tokens": pt, "completion_tokens": ct, "total_tokens": tt}
|
|
except Exception:
|
|
return None
|
|
try:
|
|
for raw_line in upstream.iter_lines(decode_unicode=False):
|
|
if not raw_line:
|
|
continue
|
|
line = raw_line.decode("utf-8", errors="ignore") if isinstance(raw_line, (bytes, bytearray)) else raw_line
|
|
if verbose and vlog:
|
|
vlog(line)
|
|
if not line.startswith("data: "):
|
|
continue
|
|
data = line[len("data: "):].strip()
|
|
if not data or data == "[DONE]":
|
|
if data == "[DONE]":
|
|
chunk = {
|
|
"id": response_id,
|
|
"object": "text_completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "text": "", "finish_reason": "stop"}],
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
|
|
continue
|
|
try:
|
|
evt = json.loads(data)
|
|
except Exception:
|
|
continue
|
|
kind = evt.get("type")
|
|
if isinstance(evt.get("response"), dict) and isinstance(evt["response"].get("id"), str):
|
|
response_id = evt["response"].get("id") or response_id
|
|
if kind == "response.output_text.delta":
|
|
delta_text = evt.get("delta") or ""
|
|
chunk = {
|
|
"id": response_id,
|
|
"object": "text_completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "text": delta_text, "finish_reason": None}],
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
|
|
elif kind == "response.output_text.done":
|
|
chunk = {
|
|
"id": response_id,
|
|
"object": "text_completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "text": "", "finish_reason": "stop"}],
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n".encode("utf-8")
|
|
elif kind == "response.completed":
|
|
m = _extract_usage(evt)
|
|
if m:
|
|
upstream_usage = m
|
|
if include_usage and upstream_usage:
|
|
try:
|
|
usage_chunk = {
|
|
"id": response_id,
|
|
"object": "text_completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [{"index": 0, "text": "", "finish_reason": None}],
|
|
"usage": upstream_usage,
|
|
}
|
|
yield f"data: {json.dumps(usage_chunk)}\n\n".encode("utf-8")
|
|
except Exception:
|
|
pass
|
|
yield b"data: [DONE]\n\n"
|
|
break
|
|
finally:
|
|
upstream.close()
|