feat: add responses api, websocket support, and fast mode

This commit is contained in:
Game_Time
2026-03-23 15:41:42 +05:00
parent e96db19538
commit 8754203ec6
22 changed files with 2148 additions and 119 deletions

View File

@@ -24,6 +24,7 @@ Set options in `.env` or pass environment variables:
- `CHATGPT_LOCAL_REASONING_EFFORT`: minimal|low|medium|high|xhigh
- `CHATGPT_LOCAL_REASONING_SUMMARY`: auto|concise|detailed|none
- `CHATGPT_LOCAL_REASONING_COMPAT`: legacy|o3|think-tags|current
- `CHATGPT_LOCAL_FAST_MODE`: `true|false` to enable fast mode by default for supported models
- `CHATGPT_LOCAL_DEBUG_MODEL`: force model override (e.g., `gpt-5.4`)
- `CHATGPT_LOCAL_CLIENT_ID`: OAuth client id override (rarely needed)
- `CHATGPT_LOCAL_EXPOSE_REASONING_MODELS`: `true|false` to add reasoning model variants to `/v1/models`

179
README.md
View File

@@ -1,172 +1,175 @@
<div align="center">
<h1>ChatMock
<div align="center">
<a href="https://github.com/RayBytes/ChatMock/stargazers"><img src="https://img.shields.io/github/stars/RayBytes/ChatMock" alt="Stars Badge"/></a>
<a href="https://github.com/RayBytes/ChatMock/network/members"><img src="https://img.shields.io/github/forks/RayBytes/ChatMock" alt="Forks Badge"/></a>
<a href="https://github.com/RayBytes/ChatMock/pulls"><img src="https://img.shields.io/github/issues-pr/RayBytes/ChatMock" alt="Pull Requests Badge"/></a>
<a href="https://github.com/RayBytes/ChatMock/issues"><img src="https://img.shields.io/github/issues/RayBytes/ChatMock" alt="Issues Badge"/></a>
<a href="https://github.com/RayBytes/ChatMock/graphs/contributors"><img alt="GitHub contributors" src="https://img.shields.io/github/contributors/RayBytes/ChatMock?color=2b9348"></a>
<a href="https://github.com/RayBytes/ChatMock/blob/master/LICENSE"><img src="https://img.shields.io/github/license/RayBytes/ChatMock?color=2b9348" alt="License Badge"/></a>
</div>
</h1>
<p><b>OpenAI & Ollama compatible API powered by your ChatGPT plan.</b></p>
<p>Use your ChatGPT Plus/Pro account to call OpenAI models from code or alternate chat UIs.</p>
<br>
# ChatMock
**Allows Codex to work in your favourite chat apps and coding tools.**
[![PyPI](https://img.shields.io/pypi/v/chatmock?color=blue&label=pypi)](https://pypi.org/project/chatmock/)
[![Python](https://img.shields.io/pypi/pyversions/chatmock)](https://pypi.org/project/chatmock/)
[![License](https://img.shields.io/github/license/RayBytes/ChatMock)](LICENSE)
[![Stars](https://img.shields.io/github/stars/RayBytes/ChatMock?style=flat)](https://github.com/RayBytes/ChatMock/stargazers)
[![Last Commit](https://img.shields.io/github/last-commit/RayBytes/ChatMock)](https://github.com/RayBytes/ChatMock/commits/main)
[![Issues](https://img.shields.io/github/issues/RayBytes/ChatMock)](https://github.com/RayBytes/ChatMock/issues)
<br>
</div>
## What It Does
<br>
ChatMock runs a local server that creates an OpenAI/Ollama compatible API, and requests are then fulfilled using your authenticated ChatGPT login with the oauth client of Codex, OpenAI's coding CLI tool. This allows you to use GPT-5, GPT-5-Codex, and other models right through your OpenAI account, without requiring an api key. You are then able to use it in other chat apps or other coding tools. <br>
This does require a paid ChatGPT account.
## Quickstart
### Homebrew
## Install
#### Homebrew
```bash
brew tap RayBytes/chatmock
brew install chatmock
```
### CLI
#### pipx / pip
```bash
pipx install chatmock
```
### GUI
#### GUI
Download from [releases](https://github.com/RayBytes/ChatMock/releases) (macOS & Windows)
If you're on **macOS** or **Windows**, you can download the GUI app from the [GitHub releases](https://github.com/RayBytes/ChatMock/releases).
#### Docker
See [DOCKER.md](DOCKER.md)
### Python
If you wish to just simply run this as a python flask server, you are also freely welcome too.
<br>
Clone or download this repository, then cd into the project directory. Then follow the instrunctions listed below.
1. Sign in with your ChatGPT account and follow the prompts
```bash
python chatmock.py login
```
You can make sure this worked by running `python chatmock.py info`
2. After the login completes successfully, you can just simply start the local server
## Getting Started
```bash
python chatmock.py serve
# 1. Sign in with your ChatGPT account
chatmock login
# 2. Start the server
chatmock serve
```
Then, you can simply use the address and port as the baseURL as you require (http://127.0.0.1:8000 by default)
**Reminder:** When setting a baseURL in other applications, make you sure you include /v1/ at the end of the URL if you're using this as a OpenAI compatible endpoint (e.g http://127.0.0.1:8000/v1)
The server runs at `http://127.0.0.1:8000` by default. Use `http://127.0.0.1:8000/v1` as your base URL for OpenAI-compatible apps.
### Docker
<br>
Read [the docker instrunctions here](https://github.com/RayBytes/ChatMock/blob/main/DOCKER.md)
## Usage
# Examples
### Python
<details open>
<summary><b>Python</b></summary>
```python
from openai import OpenAI
client = OpenAI(
base_url="http://127.0.0.1:8000/v1",
api_key="key" # ignored
api_key="anything" # not checked
)
resp = client.chat.completions.create(
response = client.chat.completions.create(
model="gpt-5.4",
messages=[{"role": "user", "content": "hello world"}]
messages=[{"role": "user", "content": "hello"}]
)
print(resp.choices[0].message.content)
print(response.choices[0].message.content)
```
### curl
</details>
<details>
<summary><b>cURL</b></summary>
```bash
curl http://127.0.0.1:8000/v1/chat/completions \
-H "Authorization: Bearer key" \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-5.4",
"messages": [{"role":"user","content":"hello world"}]
"messages": [{"role": "user", "content": "hello"}]
}'
```
# What's supported
</details>
- Tool/Function calling
- Vision/Image understanding
- Thinking summaries (through thinking tags)
- Thinking effort
<br>
## Notes & Limits
## Supported Models
- Requires an active, paid ChatGPT account.
- Some context length might be taken up by internal instructions (but they dont seem to degrade the model)
- Use responsibly and at your own risk. This project is not affiliated with OpenAI, and is a educational exercise.
# Supported models
- `gpt-5.4`
- `gpt-5.4-mini`
- `gpt-5.2`
- `gpt-5.1`
- `gpt-5`
- `gpt-5.3-codex`
- `gpt-5-codex`
- `gpt-5.3-codex-spark`
- `gpt-5.2-codex`
- `gpt-5-codex`
- `gpt-5.1-codex`
- `gpt-5.1-codex-max`
- `gpt-5.1-codex-mini`
- `codex-mini`
# Customisation / Configuration
<br>
### Thinking effort
## Features
- `--reasoning-effort` (choice of none,minimal,low,medium,high,xhigh)<br>
GPT-5 has a configurable amount of "effort" it can put into thinking, which may cause it to take more time for a response to return, but may overall give a smarter answer. Applying this parameter after `serve` forces the server to use this reasoning effort by default, unless overrided by the API request with a different effort set. The default reasoning effort without setting this parameter is `medium`.<br>
The `gpt-5.1` family (including codex) supports `low`, `medium`, and `high` while `gpt-5.1-codex-max` adds `xhigh`. The `gpt-5.2` and `gpt-5.3` families (including codex) support `low`, `medium`, `high`, and `xhigh`. `gpt-5.4` supports `none`, `low`, `medium`, `high`, and `xhigh`.
- Tool / function calling
- Vision / image input
- Thinking summaries (via think tags)
- Configurable thinking effort
- Fast mode for supported models
- Web search tool
- OpenAI-compatible `/v1/responses` (HTTP + WebSocket)
- Ollama-compatible endpoints
- Reasoning effort exposed as separate models (optional)
### Thinking summaries
<br>
- `--reasoning-summary` (choice of auto,concise,detailed,none)<br>
Models like GPT-5 do not return raw thinking content, but instead return thinking summaries. These can also be customised by you.
## Configuration
### OpenAI Tools
All flags go after `chatmock serve`. These can also be set as environment variables.
- `--enable-web-search`<br>
You can also access OpenAI tools through this project. Currently, only web search is available.
You can enable it by starting the server with this parameter, which will allow OpenAI to determine when a request requires a web search, or you can use the following parameters during a request to the API to enable web search:
<br><br>
`responses_tools`: supports `[{"type":"web_search"}]` / `{ "type": "web_search_preview" }`<br>
`responses_tool_choice`: `"auto"` or `"none"`
| Flag | Env var | Options | Default | Description |
|------|---------|---------|---------|-------------|
| `--reasoning-effort` | `CHATGPT_LOCAL_REASONING_EFFORT` | none, minimal, low, medium, high, xhigh | medium | How hard the model thinks |
| `--reasoning-summary` | `CHATGPT_LOCAL_REASONING_SUMMARY` | auto, concise, detailed, none | auto | Thinking summary verbosity |
| `--reasoning-compat` | `CHATGPT_LOCAL_REASONING_COMPAT` | legacy, o3, think-tags | think-tags | How reasoning is returned to the client |
| `--fast-mode` | `CHATGPT_LOCAL_FAST_MODE` | true/false | false | Priority processing for supported models |
| `--enable-web-search` | `CHATGPT_LOCAL_ENABLE_WEB_SEARCH` | true/false | false | Allow the model to search the web |
| `--expose-reasoning-models` | `CHATGPT_LOCAL_EXPOSE_REASONING_MODELS` | true/false | false | List each reasoning level as its own model |
<details>
<summary><b>Web search in a request</b></summary>
#### Example usage
```json
{
"model": "gpt-5.4",
"messages": [{"role":"user","content":"Find current METAR rules"}],
"stream": true,
"messages": [{"role": "user", "content": "latest news on ..."}],
"responses_tools": [{"type": "web_search"}],
"responses_tool_choice": "auto"
}
```
### Expose reasoning models
</details>
- `--expose-reasoning-models`<br>
If your preferred app doesnt support selecting reasoning effort, or you just want a simpler approach, this parameter exposes each reasoning level as a separate, queryable model. Each reasoning level also appears individually under /v1/models, so model pickers in your favorite chat apps will list all reasoning options as distinct models you can switch between.
<details>
<summary><b>Fast mode in a request</b></summary>
```json
{
"model": "gpt-5.4",
"input": "summarize this",
"fast_mode": true
}
```
</details>
<br>
## Notes
If you wish to have the fastest responses, I'd recommend setting `--reasoning-effort` to low, and `--reasoning-summary` to none. <br>
All parameters and choices can be seen by sending `python chatmock.py serve --h`<br>
The context size of this route is also larger than what you get access to in the regular ChatGPT app.<br>
When the model returns a thinking summary, the model will send back thinking tags to make it compatible with chat apps. **If you don't like this behavior, you can instead set `--reasoning-compat` to legacy, and reasoning will be set in the reasoning tag instead of being returned in the actual response text.**
Use responsibly and at your own risk. This project is not affiliated with OpenAI.
<br>
## Star History

View File

@@ -1,11 +1,13 @@
from __future__ import annotations
from flask import Flask, jsonify
from flask_sock import Sock
from .config import BASE_INSTRUCTIONS, GPT5_CODEX_INSTRUCTIONS
from .http import build_cors_headers
from .routes_openai import openai_bp
from .routes_ollama import ollama_bp
from .websocket_routes import register_websocket_routes
def create_app(
@@ -14,6 +16,7 @@ def create_app(
reasoning_effort: str = "medium",
reasoning_summary: str = "auto",
reasoning_compat: str = "think-tags",
fast_mode: bool = False,
debug_model: str | None = None,
expose_reasoning_models: bool = False,
default_web_search: bool = False,
@@ -26,6 +29,7 @@ def create_app(
REASONING_EFFORT=reasoning_effort,
REASONING_SUMMARY=reasoning_summary,
REASONING_COMPAT=reasoning_compat,
FAST_MODE=bool(fast_mode),
DEBUG_MODEL=debug_model,
BASE_INSTRUCTIONS=BASE_INSTRUCTIONS,
GPT5_CODEX_INSTRUCTIONS=GPT5_CODEX_INSTRUCTIONS,
@@ -46,5 +50,7 @@ def create_app(
app.register_blueprint(openai_bp)
app.register_blueprint(ollama_bp)
sock = Sock(app)
register_websocket_routes(sock)
return app

View File

@@ -267,6 +267,7 @@ def cmd_serve(
reasoning_effort: str,
reasoning_summary: str,
reasoning_compat: str,
fast_mode: bool,
debug_model: str | None,
expose_reasoning_models: bool,
default_web_search: bool,
@@ -277,6 +278,7 @@ def cmd_serve(
reasoning_effort=reasoning_effort,
reasoning_summary=reasoning_summary,
reasoning_compat=reasoning_compat,
fast_mode=fast_mode,
debug_model=debug_model,
expose_reasoning_models=expose_reasoning_models,
default_web_search=default_web_search,
@@ -309,6 +311,12 @@ def main() -> None:
default=os.getenv("CHATGPT_LOCAL_DEBUG_MODEL"),
help="Forcibly override requested 'model' with this value",
)
p_serve.add_argument(
"--fast-mode",
action=argparse.BooleanOptionalAction,
default=(os.getenv("CHATGPT_LOCAL_FAST_MODE") or "").strip().lower() in ("1", "true", "yes", "on"),
help="Enable GPT fast mode by default for supported models; request-level overrides still take precedence.",
)
p_serve.add_argument(
"--reasoning-effort",
choices=["none", "minimal", "low", "medium", "high", "xhigh"],
@@ -366,6 +374,7 @@ def main() -> None:
reasoning_effort=args.reasoning_effort,
reasoning_summary=args.reasoning_summary,
reasoning_compat=args.reasoning_compat,
fast_mode=args.fast_mode,
debug_model=args.debug_model,
expose_reasoning_models=args.expose_reasoning_models,
default_web_search=args.enable_web_search,

92
chatmock/fast_mode.py Normal file
View File

@@ -0,0 +1,92 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from .model_registry import normalize_model_name
PRIORITY_SUPPORTED_MODELS = frozenset(
(
"gpt-5.4",
"gpt-5.2",
"gpt-5.1",
"gpt-5",
"gpt-5.1-codex",
"gpt-5-codex",
)
)
_TRUE_STRINGS = {"1", "true", "yes", "on"}
_FALSE_STRINGS = {"0", "false", "no", "off"}
def parse_optional_bool(value: Any) -> bool | None:
if isinstance(value, bool):
return value
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in _TRUE_STRINGS:
return True
if normalized in _FALSE_STRINGS:
return False
return None
def supports_priority_service_tier(model: str | None) -> bool:
return normalize_model_name(model) in PRIORITY_SUPPORTED_MODELS
@dataclass(frozen=True)
class ServiceTierResolution:
service_tier: str | None
error_message: str | None = None
warning_message: str | None = None
used_server_default: bool = False
def resolve_service_tier(
model: str | None,
*,
request_fast_mode: Any = None,
request_service_tier: Any = None,
server_fast_mode: bool = False,
) -> ServiceTierResolution:
explicit_fast_mode = parse_optional_bool(request_fast_mode)
tier: str | None = None
explicit_request = False
used_server_default = False
if explicit_fast_mode is not None:
tier = "priority" if explicit_fast_mode else None
explicit_request = True
elif isinstance(request_service_tier, str) and request_service_tier.strip():
tier = request_service_tier.strip().lower()
explicit_request = True
elif server_fast_mode:
tier = "priority"
used_server_default = True
if tier == "priority" and not supports_priority_service_tier(model):
normalized = normalize_model_name(model)
message = (
f"Fast mode is not supported for model '{normalized}'. "
"Use a supported GPT-5 priority-processing model or disable fast mode for this request."
)
if explicit_request:
return ServiceTierResolution(
service_tier=None,
error_message=message,
used_server_default=used_server_default,
)
return ServiceTierResolution(
service_tier=None,
warning_message=message,
used_server_default=used_server_default,
)
return ServiceTierResolution(
service_tier=tier,
used_server_default=used_server_default,
)

View File

@@ -62,6 +62,14 @@ _MODEL_SPECS = (
variant_efforts=("xhigh", "high", "medium", "low"),
uses_codex_instructions=True,
),
ModelSpec(
public_id="gpt-5.3-codex-spark",
upstream_id="gpt-5.3-codex-spark",
aliases=("gpt5.3-codex-spark", "gpt-5.3-codex-spark-latest"),
allowed_efforts=frozenset(("low", "medium", "high", "xhigh")),
variant_efforts=("xhigh", "high", "medium", "low"),
uses_codex_instructions=True,
),
ModelSpec(
public_id="gpt-5-codex",
upstream_id="gpt-5-codex",

242
chatmock/responses_api.py Normal file
View File

@@ -0,0 +1,242 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Iterator, List
from .config import BASE_INSTRUCTIONS, GPT5_CODEX_INSTRUCTIONS
from .fast_mode import ServiceTierResolution, resolve_service_tier
from .model_registry import (
allowed_efforts_for_model,
extract_reasoning_from_model_name,
normalize_model_name,
uses_codex_instructions,
)
from .reasoning import build_reasoning_param
from .session import ensure_session_id
@dataclass(frozen=True)
class ResponsesRequestError(Exception):
message: str
status_code: int = 400
code: str | None = None
def __str__(self) -> str:
return self.message
@dataclass(frozen=True)
class NormalizedResponsesRequest:
payload: Dict[str, Any]
requested_model: str | None
normalized_model: str
session_id: str
service_tier_resolution: ServiceTierResolution
def instructions_for_model(config: Dict[str, Any], model: str) -> str:
base = config.get("BASE_INSTRUCTIONS", BASE_INSTRUCTIONS)
if uses_codex_instructions(model):
codex = config.get("GPT5_CODEX_INSTRUCTIONS") or GPT5_CODEX_INSTRUCTIONS
if isinstance(codex, str) and codex.strip():
return codex
return base
def extract_client_session_id(headers: Any) -> str | None:
try:
return headers.get("X-Session-Id") or headers.get("session_id") or None
except Exception:
return None
def _input_items_for_session(raw_input: Any) -> List[Dict[str, Any]]:
if isinstance(raw_input, list):
return [item for item in raw_input if isinstance(item, dict)]
if isinstance(raw_input, dict):
return [raw_input]
if isinstance(raw_input, str) and raw_input.strip():
return [
{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": raw_input}],
}
]
return []
def canonicalize_responses_input(raw_input: Any) -> Any:
if isinstance(raw_input, list):
return [item for item in raw_input if isinstance(item, dict)]
if isinstance(raw_input, dict):
return [raw_input]
if isinstance(raw_input, str):
return _input_items_for_session(raw_input)
return raw_input
def normalize_responses_payload(
payload: Dict[str, Any],
*,
config: Dict[str, Any],
client_session_id: str | None = None,
) -> NormalizedResponsesRequest:
requested_model = payload.get("model") if isinstance(payload.get("model"), str) else None
normalized_model = normalize_model_name(requested_model, config.get("DEBUG_MODEL"))
normalized = dict(payload)
normalized["model"] = normalized_model
if "input" in normalized:
normalized["input"] = canonicalize_responses_input(normalized.get("input"))
if "store" not in normalized:
normalized["store"] = False
instructions = normalized.get("instructions")
if not isinstance(instructions, str) or not instructions.strip():
instructions = instructions_for_model(config, normalized_model)
normalized["instructions"] = instructions
reasoning_effort = config.get("REASONING_EFFORT", "medium")
reasoning_summary = config.get("REASONING_SUMMARY", "auto")
reasoning_overrides = (
normalized.get("reasoning")
if isinstance(normalized.get("reasoning"), dict)
else extract_reasoning_from_model_name(requested_model)
)
normalized["reasoning"] = build_reasoning_param(
reasoning_effort,
reasoning_summary,
reasoning_overrides,
allowed_efforts=allowed_efforts_for_model(normalized_model),
)
include = normalized.get("include")
include_list = [item for item in include if isinstance(item, str)] if isinstance(include, list) else []
if "reasoning.encrypted_content" not in include_list:
include_list.append("reasoning.encrypted_content")
normalized["include"] = include_list
tools = normalized.get("tools")
if (not isinstance(tools, list) or not tools) and bool(config.get("DEFAULT_WEB_SEARCH")):
tool_choice = normalized.get("tool_choice")
if not (isinstance(tool_choice, str) and tool_choice.strip().lower() == "none"):
normalized["tools"] = [{"type": "web_search"}]
service_tier_resolution = resolve_service_tier(
normalized_model,
request_fast_mode=normalized.get("fast_mode"),
request_service_tier=normalized.get("service_tier"),
server_fast_mode=bool(config.get("FAST_MODE")),
)
if service_tier_resolution.error_message:
raise ResponsesRequestError(service_tier_resolution.error_message)
if service_tier_resolution.service_tier is None:
normalized.pop("service_tier", None)
else:
normalized["service_tier"] = service_tier_resolution.service_tier
normalized.pop("fast_mode", None)
input_items = _input_items_for_session(normalized.get("input"))
session_id = ensure_session_id(instructions, input_items, client_session_id)
prompt_cache_key = normalized.get("prompt_cache_key")
if not isinstance(prompt_cache_key, str) or not prompt_cache_key.strip():
normalized["prompt_cache_key"] = session_id
return NormalizedResponsesRequest(
payload=normalized,
requested_model=requested_model,
normalized_model=normalized_model,
session_id=session_id,
service_tier_resolution=service_tier_resolution,
)
def iter_sse_event_payloads(upstream: Any) -> Iterator[Dict[str, Any]]:
for raw in upstream.iter_lines(decode_unicode=False):
if not raw:
continue
line = raw.decode("utf-8", errors="ignore") if isinstance(raw, (bytes, bytearray)) else raw
if not line.startswith("data: "):
continue
data = line[len("data: ") :].strip()
if not data or data == "[DONE]":
if data == "[DONE]":
break
continue
try:
evt = json.loads(data)
except Exception:
continue
if isinstance(evt, dict):
yield evt
def aggregate_response_from_sse(
upstream: Any,
*,
on_event: Any | None = None,
) -> tuple[Dict[str, Any] | None, Dict[str, Any] | None]:
response_obj: Dict[str, Any] | None = None
error_obj: Dict[str, Any] | None = None
try:
for evt in iter_sse_event_payloads(upstream):
if callable(on_event):
try:
on_event(evt)
except Exception:
pass
response = evt.get("response")
if isinstance(response, dict):
response_obj = response
kind = evt.get("type")
if kind == "response.failed":
if isinstance(response, dict) and isinstance(response.get("error"), dict):
error_obj = {"error": response.get("error")}
else:
error_obj = {"error": {"message": "response.failed"}}
break
if kind == "response.completed":
break
finally:
upstream.close()
return response_obj, error_obj
def stream_upstream_bytes(
upstream: Any,
*,
on_event: Any | None = None,
) -> Iterable[bytes]:
buffer = b""
try:
for chunk in upstream.iter_content(chunk_size=None):
if chunk:
if callable(on_event):
if isinstance(chunk, bytes):
buffer += chunk
else:
buffer += str(chunk).encode("utf-8", errors="ignore")
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
line = line.rstrip(b"\r")
if not line.startswith(b"data: "):
continue
data = line[len(b"data: ") :].strip()
if not data or data == b"[DONE]":
continue
try:
evt = json.loads(data.decode("utf-8", errors="ignore"))
except Exception:
evt = None
if isinstance(evt, dict):
try:
on_event(evt)
except Exception:
pass
yield chunk
finally:
upstream.close()

View File

@@ -8,9 +8,11 @@ from typing import Any, Dict, List
from flask import Blueprint, Response, current_app, jsonify, make_response, request, stream_with_context
from .config import BASE_INSTRUCTIONS, GPT5_CODEX_INSTRUCTIONS
from .fast_mode import resolve_service_tier
from .limits import record_rate_limits_from_response
from .http import build_cors_headers
from .model_registry import list_public_models, uses_codex_instructions
from .responses_api import instructions_for_model
from .reasoning import (
allowed_efforts_for_model,
build_reasoning_param,
@@ -71,12 +73,7 @@ def ollama_version() -> Response:
def _instructions_for_model(model: str) -> str:
base = current_app.config.get("BASE_INSTRUCTIONS", BASE_INSTRUCTIONS)
if uses_codex_instructions(model):
codex = current_app.config.get("GPT5_CODEX_INSTRUCTIONS") or GPT5_CODEX_INSTRUCTIONS
if isinstance(codex, str) and codex.strip():
return codex
return base
return instructions_for_model(current_app.config, model)
_OLLAMA_FAKE_EVAL = {
@@ -254,6 +251,19 @@ def ollama_chat() -> Response:
model_reasoning = extract_reasoning_from_model_name(model)
normalized_model = normalize_model_name(model)
service_tier_resolution = resolve_service_tier(
normalized_model,
request_fast_mode=payload.get("fast_mode"),
request_service_tier=payload.get("service_tier"),
server_fast_mode=bool(current_app.config.get("FAST_MODE")),
)
if service_tier_resolution.warning_message and verbose:
print(f"[FastMode] {service_tier_resolution.warning_message}")
if service_tier_resolution.error_message:
err = {"error": service_tier_resolution.error_message}
if verbose:
_log_json("OUT POST /api/chat", err)
return jsonify(err), 400
upstream, error_resp = start_upstream_request(
normalized_model,
input_items,
@@ -267,6 +277,7 @@ def ollama_chat() -> Response:
model_reasoning,
allowed_efforts=allowed_efforts_for_model(model),
),
service_tier=service_tier_resolution.service_tier,
)
if error_resp is not None:
if verbose:
@@ -307,6 +318,7 @@ def ollama_chat() -> Response:
model_reasoning,
allowed_efforts=allowed_efforts_for_model(model),
),
service_tier=service_tier_resolution.service_tier,
)
record_rate_limits_from_response(upstream2)
if err2 is None and upstream2 is not None and upstream2.status_code < 400:

View File

@@ -7,16 +7,31 @@ from typing import Any, Dict, List
from flask import Blueprint, Response, current_app, jsonify, make_response, request
from .config import BASE_INSTRUCTIONS, GPT5_CODEX_INSTRUCTIONS
from .fast_mode import resolve_service_tier
from .limits import record_rate_limits_from_response
from .http import build_cors_headers
from .model_registry import list_public_models, uses_codex_instructions
from .responses_api import (
ResponsesRequestError,
aggregate_response_from_sse,
extract_client_session_id,
instructions_for_model,
normalize_responses_payload,
stream_upstream_bytes,
)
from .reasoning import (
allowed_efforts_for_model,
apply_reasoning_to_message,
build_reasoning_param,
extract_reasoning_from_model_name,
)
from .upstream import normalize_model_name, start_upstream_request
from .session import (
clear_responses_reuse_state,
note_responses_final_response,
note_responses_stream_event,
prepare_responses_request_for_session,
)
from .upstream import normalize_model_name, start_upstream_raw_request, start_upstream_request
from .utils import (
convert_chat_messages_to_responses_input,
convert_tools_chat_to_responses,
@@ -59,12 +74,32 @@ def _wrap_stream_logging(label: str, iterator, enabled: bool):
def _instructions_for_model(model: str) -> str:
base = current_app.config.get("BASE_INSTRUCTIONS", BASE_INSTRUCTIONS)
if uses_codex_instructions(model):
codex = current_app.config.get("GPT5_CODEX_INSTRUCTIONS") or GPT5_CODEX_INSTRUCTIONS
if isinstance(codex, str) and codex.strip():
return codex
return base
return instructions_for_model(current_app.config, model)
def _service_tier_from_payload(
model: str,
payload: Dict[str, Any],
*,
verbose: bool = False,
) -> tuple[str | None, Response | None]:
resolution = resolve_service_tier(
model,
request_fast_mode=payload.get("fast_mode"),
request_service_tier=payload.get("service_tier"),
server_fast_mode=bool(current_app.config.get("FAST_MODE")),
)
if resolution.warning_message and verbose:
print(f"[FastMode] {resolution.warning_message}")
if resolution.error_message:
err = {"error": {"message": resolution.error_message}}
if verbose:
_log_json("OUT POST service_tier resolution", err)
resp = make_response(jsonify(err), 400)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return None, resp
return resolution.service_tier, None
@openai_bp.route("/v1/chat/completions", methods=["POST"])
@@ -178,6 +213,9 @@ def chat_completions() -> Response:
reasoning_overrides,
allowed_efforts=allowed_efforts_for_model(model),
)
service_tier, tier_error = _service_tier_from_payload(model, payload, verbose=verbose)
if tier_error is not None:
return tier_error
upstream, error_resp = start_upstream_request(
model,
@@ -187,6 +225,7 @@ def chat_completions() -> Response:
tool_choice=tool_choice,
parallel_tool_calls=parallel_tool_calls,
reasoning_param=reasoning_param,
service_tier=service_tier,
)
if error_resp is not None:
if verbose:
@@ -224,6 +263,7 @@ def chat_completions() -> Response:
tool_choice=safe_choice,
parallel_tool_calls=parallel_tool_calls,
reasoning_param=reasoning_param,
service_tier=service_tier,
)
record_rate_limits_from_response(upstream2)
if err2 is None and upstream2 is not None and upstream2.status_code < 400:
@@ -413,11 +453,15 @@ def completions() -> Response:
reasoning_overrides,
allowed_efforts=allowed_efforts_for_model(model),
)
service_tier, tier_error = _service_tier_from_payload(model, payload, verbose=verbose)
if tier_error is not None:
return tier_error
upstream, error_resp = start_upstream_request(
model,
input_items,
instructions=_instructions_for_model(model),
reasoning_param=reasoning_param,
service_tier=service_tier,
)
if error_resp is not None:
if verbose:
@@ -529,6 +573,161 @@ def completions() -> Response:
return resp
@openai_bp.route("/v1/responses", methods=["POST"])
def responses_create() -> Response:
verbose = bool(current_app.config.get("VERBOSE"))
raw = request.get_data(cache=True, as_text=True) or ""
if verbose:
try:
print("IN POST /v1/responses\n" + raw)
except Exception:
pass
try:
payload = json.loads(raw) if raw else {}
except Exception:
err = {"error": {"message": "Invalid JSON body"}}
if verbose:
_log_json("OUT POST /v1/responses", err)
return jsonify(err), 400
if not isinstance(payload, dict):
err = {"error": {"message": "Request body must be a JSON object"}}
if verbose:
_log_json("OUT POST /v1/responses", err)
return jsonify(err), 400
try:
normalized = normalize_responses_payload(
payload,
config=current_app.config,
client_session_id=extract_client_session_id(request.headers),
)
except ResponsesRequestError as exc:
err: Dict[str, Any] = {"error": {"message": str(exc)}}
if exc.code:
err["error"]["code"] = exc.code
if verbose:
_log_json("OUT POST /v1/responses", err)
return jsonify(err), exc.status_code
if normalized.service_tier_resolution.warning_message and verbose:
print(f"[FastMode] {normalized.service_tier_resolution.warning_message}")
prepared = prepare_responses_request_for_session(
normalized.session_id,
normalized.payload,
allow_previous_response_id=False,
)
stream_req = bool(prepared.payload.get("stream", False))
upstream_payload = dict(prepared.payload)
upstream_payload["stream"] = True
upstream, error_resp = start_upstream_raw_request(
upstream_payload,
session_id=normalized.session_id,
stream=True,
)
if error_resp is not None:
clear_responses_reuse_state(normalized.session_id)
if verbose:
try:
body = error_resp.get_data(as_text=True)
if body:
try:
parsed = json.loads(body)
except Exception:
parsed = body
_log_json("OUT POST /v1/responses", parsed)
except Exception:
pass
return error_resp
record_rate_limits_from_response(upstream)
if upstream.status_code >= 400:
try:
err_body = json.loads(upstream.content.decode("utf-8", errors="ignore")) if upstream.content else {"error": {"message": upstream.text}}
except Exception:
err_body = {"error": {"message": upstream.text or "Upstream error"}}
finally:
upstream.close()
clear_responses_reuse_state(normalized.session_id)
if verbose:
_log_json("OUT POST /v1/responses", err_body)
resp = make_response(jsonify(err_body), upstream.status_code)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return resp
if stream_req:
if verbose:
print("OUT POST /v1/responses (streaming response)")
stream_iter = _wrap_stream_logging(
"STREAM OUT /v1/responses",
stream_upstream_bytes(
upstream,
on_event=lambda evt: note_responses_stream_event(normalized.session_id, evt),
),
verbose,
)
resp = Response(
stream_iter,
status=upstream.status_code,
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return resp
content_type = upstream.headers.get("Content-Type", "")
if "application/json" in content_type.lower():
try:
body = upstream.json()
except Exception:
body = None
finally:
upstream.close()
if isinstance(body, dict):
note_responses_final_response(normalized.session_id, body)
if verbose:
_log_json("OUT POST /v1/responses", body)
resp = make_response(jsonify(body), upstream.status_code)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return resp
response_obj, error_obj = aggregate_response_from_sse(
upstream,
on_event=lambda evt: note_responses_stream_event(normalized.session_id, evt),
)
if error_obj is not None:
clear_responses_reuse_state(normalized.session_id)
if verbose:
_log_json("OUT POST /v1/responses", error_obj)
resp = make_response(jsonify(error_obj), 502)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return resp
if response_obj is None:
clear_responses_reuse_state(normalized.session_id)
err = {"error": {"message": "Upstream response stream did not contain a completed response object"}}
if verbose:
_log_json("OUT POST /v1/responses", err)
resp = make_response(jsonify(err), 502)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return resp
if verbose:
_log_json("OUT POST /v1/responses", response_obj)
resp = make_response(jsonify(response_obj), upstream.status_code)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return resp
@openai_bp.route("/v1/models", methods=["GET"])
def list_models() -> Response:
expose_variants = bool(current_app.config.get("EXPOSE_REASONING_MODELS"))

View File

@@ -1,16 +1,37 @@
from __future__ import annotations
import copy
import hashlib
import json
import threading
import uuid
from typing import Any, Dict, List, Tuple
from dataclasses import dataclass, field
from typing import Any, Dict, List
_LOCK = threading.Lock()
_FINGERPRINT_TO_UUID: Dict[str, str] = {}
_ORDER: List[str] = []
_MAX_ENTRIES = 10000
_RESPONSES_SESSION_STATE: Dict[str, "_ResponsesSessionState"] = {}
_RESPONSES_ORDER: List[str] = []
@dataclass(frozen=True)
class PreparedResponsesRequest:
payload: Dict[str, Any]
session_id: str
@dataclass
class _ResponsesSessionState:
last_request_payload: Dict[str, Any] | None = None
last_response_id: str | None = None
last_response_items: List[Dict[str, Any]] = field(default_factory=list)
inflight_request_payload: Dict[str, Any] | None = None
inflight_track_result: bool = False
inflight_response_id: str | None = None
inflight_response_items: List[Dict[str, Any]] = field(default_factory=list)
def _canonicalize_first_user_message(input_items: List[Dict[str, Any]]) -> Dict[str, Any] | None:
@@ -70,6 +91,61 @@ def _remember(fp: str, sid: str) -> None:
_FINGERPRINT_TO_UUID.pop(oldest, None)
def _remember_responses_session(session_id: str) -> _ResponsesSessionState:
state = _RESPONSES_SESSION_STATE.get(session_id)
if state is None:
state = _ResponsesSessionState()
_RESPONSES_SESSION_STATE[session_id] = state
_RESPONSES_ORDER.append(session_id)
if len(_RESPONSES_ORDER) > _MAX_ENTRIES:
oldest = _RESPONSES_ORDER.pop(0)
_RESPONSES_SESSION_STATE.pop(oldest, None)
return state
def _request_without_input(payload: Dict[str, Any]) -> Dict[str, Any]:
clone = copy.deepcopy(payload)
clone["input"] = []
clone.pop("previous_response_id", None)
return clone
def _input_list(payload: Dict[str, Any]) -> List[Dict[str, Any]] | None:
raw = payload.get("input")
if not isinstance(raw, list):
return None
return [item for item in copy.deepcopy(raw) if isinstance(item, dict)]
def _conversation_output_items(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
reusable: List[Dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
item_type = item.get("type")
if item_type == "reasoning":
continue
reusable.append(copy.deepcopy(item))
return reusable
def _clear_reuse_state(state: _ResponsesSessionState) -> None:
state.last_request_payload = None
state.last_response_id = None
state.last_response_items = []
state.inflight_request_payload = None
state.inflight_track_result = False
state.inflight_response_id = None
state.inflight_response_items = []
def _clear_inflight(state: _ResponsesSessionState) -> None:
state.inflight_request_payload = None
state.inflight_track_result = False
state.inflight_response_id = None
state.inflight_response_items = []
def ensure_session_id(
instructions: str | None,
input_items: List[Dict[str, Any]],
@@ -87,3 +163,150 @@ def ensure_session_id(
_remember(fp, sid)
return sid
def prepare_responses_request_for_session(
session_id: str,
payload: Dict[str, Any],
*,
allow_previous_response_id: bool = True,
) -> PreparedResponsesRequest:
full_payload = copy.deepcopy(payload)
outbound_payload = copy.deepcopy(payload)
explicit_previous_response_id = (
isinstance(full_payload.get("previous_response_id"), str)
and bool(full_payload.get("previous_response_id").strip())
)
with _LOCK:
state = _remember_responses_session(session_id)
if explicit_previous_response_id:
_clear_reuse_state(state)
return PreparedResponsesRequest(
payload=outbound_payload,
session_id=session_id,
)
request_input = _input_list(full_payload)
if (
allow_previous_response_id
and
state.last_request_payload is not None
and state.last_response_id
and request_input is not None
and _request_without_input(state.last_request_payload) == _request_without_input(full_payload)
):
baseline: List[Dict[str, Any]] = []
previous_input = _input_list(state.last_request_payload)
if previous_input is not None:
baseline.extend(previous_input)
baseline.extend(copy.deepcopy(state.last_response_items))
baseline_len = len(baseline)
if request_input[:baseline_len] == baseline and baseline_len <= len(request_input):
outbound_payload["input"] = copy.deepcopy(request_input[baseline_len:])
outbound_payload["previous_response_id"] = state.last_response_id
state.inflight_request_payload = full_payload
state.inflight_track_result = True
state.inflight_response_id = None
state.inflight_response_items = []
return PreparedResponsesRequest(
payload=outbound_payload,
session_id=session_id,
)
def note_responses_stream_event(session_id: str, event: Dict[str, Any]) -> None:
if not isinstance(session_id, str) or not session_id.strip():
return
if not isinstance(event, dict):
return
with _LOCK:
state = _RESPONSES_SESSION_STATE.get(session_id)
if state is None:
return
kind = event.get("type")
if kind == "response.created":
response = event.get("response")
if isinstance(response, dict) and isinstance(response.get("id"), str):
state.inflight_response_id = response.get("id")
return
if kind == "response.output_item.done":
item = event.get("item")
if isinstance(item, dict):
state.inflight_response_items.append(copy.deepcopy(item))
return
if kind == "response.completed":
response = event.get("response")
response_id = None
response_items: List[Dict[str, Any]] = copy.deepcopy(state.inflight_response_items)
if isinstance(response, dict):
if isinstance(response.get("id"), str):
response_id = response.get("id")
output = response.get("output")
if isinstance(output, list) and output:
response_items = [copy.deepcopy(item) for item in output if isinstance(item, dict)]
if not response_id:
response_id = state.inflight_response_id
if state.inflight_track_result and state.inflight_request_payload is not None and response_id:
state.last_request_payload = copy.deepcopy(state.inflight_request_payload)
state.last_response_id = response_id
state.last_response_items = _conversation_output_items(response_items)
else:
state.last_request_payload = None
state.last_response_id = None
state.last_response_items = []
_clear_inflight(state)
return
if kind in ("response.failed", "error"):
_clear_reuse_state(state)
def note_responses_final_response(session_id: str, response_obj: Dict[str, Any]) -> None:
if not isinstance(session_id, str) or not session_id.strip():
return
if not isinstance(response_obj, dict):
return
with _LOCK:
state = _RESPONSES_SESSION_STATE.get(session_id)
if state is None:
return
response_id = response_obj.get("id") if isinstance(response_obj.get("id"), str) else None
output = response_obj.get("output")
output_items = [copy.deepcopy(item) for item in output if isinstance(item, dict)] if isinstance(output, list) else []
if state.inflight_track_result and state.inflight_request_payload is not None and response_id:
state.last_request_payload = copy.deepcopy(state.inflight_request_payload)
state.last_response_id = response_id
state.last_response_items = _conversation_output_items(output_items)
else:
state.last_request_payload = None
state.last_response_id = None
state.last_response_items = []
_clear_inflight(state)
def clear_responses_reuse_state(session_id: str) -> None:
if not isinstance(session_id, str) or not session_id.strip():
return
with _LOCK:
state = _RESPONSES_SESSION_STATE.get(session_id)
if state is None:
return
_clear_reuse_state(state)
def reset_session_state() -> None:
with _LOCK:
_FINGERPRINT_TO_UUID.clear()
_ORDER.clear()
_RESPONSES_SESSION_STATE.clear()
_RESPONSES_ORDER.clear()

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import json
import time
from typing import Any, Dict, List, Tuple
from urllib.parse import urlparse, urlunparse
import requests
from flask import Response, current_app, jsonify, make_response
@@ -33,6 +34,7 @@ def start_upstream_request(
tool_choice: Any | None = None,
parallel_tool_calls: bool = False,
reasoning_param: Dict[str, Any] | None = None,
service_tier: str | None = None,
):
access_token, account_id = get_effective_chatgpt_auth()
if not access_token or not account_id:
@@ -81,6 +83,62 @@ def start_upstream_request(
if reasoning_param is not None:
responses_payload["reasoning"] = reasoning_param
if isinstance(service_tier, str) and service_tier.strip():
responses_payload["service_tier"] = service_tier.strip().lower()
return start_upstream_raw_request(
responses_payload,
session_id=session_id,
stream=True,
)
def build_upstream_headers(
access_token: str,
account_id: str,
session_id: str,
*,
accept: str = "text/event-stream",
) -> Dict[str, str]:
return {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": accept,
"chatgpt-account-id": account_id,
"OpenAI-Beta": "responses=experimental",
"session_id": session_id,
}
def start_upstream_raw_request(
responses_payload: Dict[str, Any],
*,
session_id: str | None = None,
stream: bool = True,
):
access_token, account_id = get_effective_chatgpt_auth()
if not access_token or not account_id:
resp = make_response(
jsonify(
{
"error": {
"message": "Missing ChatGPT credentials. Run 'python3 chatmock.py login' first.",
}
}
),
401,
)
for k, v in build_cors_headers().items():
resp.headers.setdefault(k, v)
return None, resp
effective_session_id = session_id
if not isinstance(effective_session_id, str) or not effective_session_id.strip():
payload_prompt_cache_key = responses_payload.get("prompt_cache_key")
if isinstance(payload_prompt_cache_key, str) and payload_prompt_cache_key.strip():
effective_session_id = payload_prompt_cache_key.strip()
if not isinstance(effective_session_id, str) or not effective_session_id.strip():
effective_session_id = str(int(time.time() * 1000))
verbose = False
try:
@@ -90,21 +148,19 @@ def start_upstream_request(
if verbose:
_log_json("OUTBOUND >> ChatGPT Responses API payload", responses_payload)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "text/event-stream",
"chatgpt-account-id": account_id,
"OpenAI-Beta": "responses=experimental",
"session_id": session_id,
}
headers = build_upstream_headers(
access_token,
account_id,
effective_session_id,
accept=("text/event-stream" if stream else "application/json"),
)
try:
upstream = requests.post(
CHATGPT_RESPONSES_URL,
headers=headers,
json=responses_payload,
stream=True,
stream=stream,
timeout=600,
)
except requests.RequestException as e:
@@ -113,3 +169,13 @@ def start_upstream_request(
resp.headers.setdefault(k, v)
return None, resp
return upstream, None
def build_upstream_websocket_url() -> str:
parsed = urlparse(CHATGPT_RESPONSES_URL)
scheme = parsed.scheme.lower()
if scheme == "https":
parsed = parsed._replace(scheme="wss")
elif scheme == "http":
parsed = parsed._replace(scheme="ws")
return urlunparse(parsed)

View File

@@ -1,4 +1,4 @@
from __future__ import annotations
__version__ = "1.36"
__version__ = "1.37"

View File

@@ -0,0 +1,225 @@
from __future__ import annotations
import json
import os
import ssl
from typing import Any, Dict
import certifi
from flask import current_app, request
from flask_sock import Sock
from websockets.sync.client import connect as websocket_connect
from websockets.exceptions import ConnectionClosed
from .responses_api import (
ResponsesRequestError,
extract_client_session_id,
normalize_responses_payload,
)
from .session import (
clear_responses_reuse_state,
note_responses_stream_event,
prepare_responses_request_for_session,
)
from .upstream import build_upstream_headers, build_upstream_websocket_url
from .utils import get_effective_chatgpt_auth
def _log_json(prefix: str, payload: Any) -> None:
try:
print(f"{prefix}\n{json.dumps(payload, indent=2, ensure_ascii=False)}")
except Exception:
try:
print(f"{prefix}\n{payload}")
except Exception:
pass
def _error_event(message: str, *, status_code: int = 400, code: str | None = None) -> Dict[str, Any]:
error: Dict[str, Any] = {"message": message}
if code:
error["code"] = code
return {"type": "error", "status_code": status_code, "error": error}
def _is_terminal_event(event: Any) -> bool:
if not isinstance(event, dict):
return False
kind = event.get("type")
return kind in ("response.completed", "response.failed", "error")
def _build_websocket_ssl_context() -> ssl.SSLContext:
cafile = (
os.getenv("CODEX_CA_CERTIFICATE")
or os.getenv("SSL_CERT_FILE")
or certifi.where()
)
return ssl.create_default_context(cafile=cafile)
def connect_upstream_websocket(url: str, headers: Dict[str, str]):
return websocket_connect(
url,
additional_headers=headers,
open_timeout=15,
ssl=_build_websocket_ssl_context(),
)
def register_websocket_routes(sock: Sock) -> None:
@sock.route("/v1/responses")
def responses_websocket(ws) -> None:
verbose = bool(current_app.config.get("VERBOSE"))
upstream_ws = None
upstream_session_id: str | None = None
active_session_id: str | None = None
def _send_error(message: str, *, status_code: int = 400, code: str | None = None) -> None:
evt = _error_event(message, status_code=status_code, code=code)
if verbose:
_log_json("STREAM OUT WS /v1/responses (error)", evt)
try:
ws.send(json.dumps(evt))
except Exception:
pass
try:
while True:
incoming = ws.receive()
if incoming is None:
break
if isinstance(incoming, bytes):
incoming_text = incoming.decode("utf-8", errors="ignore")
else:
incoming_text = str(incoming)
if verbose:
print("IN WS /v1/responses\n" + incoming_text)
try:
payload = json.loads(incoming_text)
except Exception:
_send_error("Websocket frames must be valid JSON objects.", status_code=400)
break
if not isinstance(payload, dict):
_send_error("Websocket frames must be JSON objects.", status_code=400)
break
client_session_id = extract_client_session_id(request.headers)
outbound_text = incoming_text
session_id = upstream_session_id
if payload.get("type") == "response.create":
try:
normalized = normalize_responses_payload(
payload,
config=current_app.config,
client_session_id=client_session_id,
)
except ResponsesRequestError as exc:
_send_error(str(exc), status_code=exc.status_code, code=exc.code)
continue
if normalized.service_tier_resolution.warning_message and verbose:
print(f"[FastMode] {normalized.service_tier_resolution.warning_message}")
prepared = prepare_responses_request_for_session(
normalized.session_id,
normalized.payload,
allow_previous_response_id=True,
)
outbound_text = json.dumps(prepared.payload)
session_id = normalized.session_id
active_session_id = normalized.session_id
if verbose:
_log_json("OUTBOUND >> ChatGPT Responses WS payload", prepared.payload)
elif upstream_ws is None:
_send_error(
"The first websocket message must be a response.create request.",
status_code=400,
)
break
if upstream_ws is None or (session_id and session_id != upstream_session_id):
access_token, account_id = get_effective_chatgpt_auth()
if not access_token or not account_id:
if session_id:
clear_responses_reuse_state(session_id)
_send_error(
"Missing ChatGPT credentials. Run 'python3 chatmock.py login' first.",
status_code=401,
)
break
if upstream_ws is not None:
try:
upstream_ws.close()
except Exception:
pass
effective_session_id = session_id or client_session_id or ""
try:
upstream_ws = connect_upstream_websocket(
build_upstream_websocket_url(),
build_upstream_headers(
access_token,
account_id,
effective_session_id,
accept="application/json",
),
)
except Exception as exc:
if session_id:
clear_responses_reuse_state(session_id)
_send_error(
f"Upstream websocket connection failed: {exc}",
status_code=502,
)
break
upstream_session_id = effective_session_id
upstream_ws.send(outbound_text)
while True:
try:
upstream_message = upstream_ws.recv()
except ConnectionClosed:
if active_session_id:
clear_responses_reuse_state(active_session_id)
_send_error("Upstream websocket closed unexpectedly.", status_code=502)
return
if upstream_message is None:
if active_session_id:
clear_responses_reuse_state(active_session_id)
_send_error("Upstream websocket closed unexpectedly.", status_code=502)
return
if verbose:
try:
print("STREAM OUT WS /v1/responses\n" + str(upstream_message))
except Exception:
pass
ws.send(upstream_message)
try:
parsed = json.loads(upstream_message)
except Exception:
parsed = None
if isinstance(parsed, dict) and active_session_id:
note_responses_stream_event(active_session_id, parsed)
if _is_terminal_event(parsed):
if isinstance(parsed, dict) and parsed.get("type") in ("response.failed", "error"):
if upstream_ws is not None:
try:
upstream_ws.close()
except Exception:
pass
upstream_ws = None
upstream_session_id = None
break
finally:
if upstream_ws is not None:
try:
upstream_ws.close()
except Exception:
pass

View File

@@ -23,6 +23,9 @@ if [[ "$cmd" == "serve" ]]; then
if bool "${VERBOSE_OBFUSCATION:-}" || bool "${CHATGPT_LOCAL_VERBOSE_OBFUSCATION:-}"; then
ARGS+=(--verbose-obfuscation)
fi
if bool "${FAST_MODE:-}" || bool "${CHATGPT_LOCAL_FAST_MODE:-}"; then
ARGS+=(--fast-mode)
fi
if [[ "$#" -gt 0 ]]; then
ARGS+=("$@")

15
gui.py
View File

@@ -18,6 +18,7 @@ def run_server(
reasoning_effort: str = "medium",
reasoning_summary: str = "auto",
reasoning_compat: str = "think-tags",
fast_mode: bool = False,
expose_reasoning_models: bool = False,
default_web_search: bool = False,
) -> None:
@@ -25,6 +26,7 @@ def run_server(
reasoning_effort=reasoning_effort,
reasoning_summary=reasoning_summary,
reasoning_compat=reasoning_compat,
fast_mode=fast_mode,
expose_reasoning_models=expose_reasoning_models,
default_web_search=default_web_search,
)
@@ -42,6 +44,7 @@ class ServerProcess(QtCore.QObject):
self._effort = "medium"
self._summary = "auto"
self._compat = "think-tags"
self._fast_mode = False
self._expose_reasoning_models = False
self._default_web_search = False
@@ -55,6 +58,7 @@ class ServerProcess(QtCore.QObject):
effort: str,
summary: str,
compat: str,
fast_mode: bool,
expose_reasoning_models: bool,
default_web_search: bool,
) -> None:
@@ -63,6 +67,7 @@ class ServerProcess(QtCore.QObject):
self._host, self._port = host, port
self._effort, self._summary = effort, summary
self._compat = compat
self._fast_mode = fast_mode
self._expose_reasoning_models = expose_reasoning_models
self._default_web_search = default_web_search
self._proc = QtCore.QProcess()
@@ -75,6 +80,8 @@ class ServerProcess(QtCore.QObject):
"--summary", summary,
"--compat", compat,
]
if fast_mode:
args.append("--fast-mode")
if expose_reasoning_models:
args.append("--expose-reasoning-models")
if default_web_search:
@@ -352,8 +359,10 @@ class MainWindow(QtWidgets.QMainWindow):
opts.addWidget(self.compat, 1, 1)
self.expose_reasoning_models = QtWidgets.QCheckBox("Expose reasoning models")
opts.addWidget(self.expose_reasoning_models, 1, 2)
self.fast_mode = QtWidgets.QCheckBox("Enable fast mode")
opts.addWidget(self.fast_mode, 1, 3)
self.enable_web_search = QtWidgets.QCheckBox("Enable web search")
opts.addWidget(self.enable_web_search, 1, 3)
opts.addWidget(self.enable_web_search, 2, 0)
opts.setColumnStretch(1, 1)
opts.setColumnStretch(3, 1)
srv_layout.addLayout(opts)
@@ -463,6 +472,7 @@ class MainWindow(QtWidgets.QMainWindow):
effort = self.effort.currentText().strip()
summary = self.summary.currentText().strip()
compat = self.compat.currentText().strip()
fast_mode = self.fast_mode.isChecked()
expose_reasoning_models = self.expose_reasoning_models.isChecked()
default_web_search = self.enable_web_search.isChecked()
self.status.setText(f"Starting server at http://{host}:{port}")
@@ -473,6 +483,7 @@ class MainWindow(QtWidgets.QMainWindow):
effort,
summary,
compat,
fast_mode,
expose_reasoning_models,
default_web_search,
)
@@ -524,6 +535,7 @@ def main() -> None:
p.add_argument("--effort", default="medium")
p.add_argument("--summary", default="auto")
p.add_argument("--compat", default="think-tags")
p.add_argument("--fast-mode", action="store_true")
p.add_argument("--expose-reasoning-models", action="store_true")
p.add_argument("--enable-web-search", action="store_true")
args, _ = p.parse_known_args()
@@ -533,6 +545,7 @@ def main() -> None:
args.effort,
args.summary,
args.compat,
args.fast_mode,
args.expose_reasoning_models,
args.enable_web_search,
)

View File

@@ -11,12 +11,14 @@ dependencies = [
"blinker==1.9.0",
"certifi==2025.8.3",
"flask==3.1.1",
"flask-sock==0.7.0",
"idna==3.10",
"itsdangerous==2.2.0",
"jinja2==3.1.6",
"markupsafe==3.0.2",
"requests==2.32.5",
"urllib3==2.5.0",
"websockets==15.0.1",
"werkzeug==3.1.3",
]

View File

@@ -0,0 +1,176 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
import uuid
from typing import Any, Dict
import requests
def _post(url: str, api_key: str, session_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
response = requests.post(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"X-Session-Id": session_id,
},
json=payload,
timeout=180,
)
try:
body = response.json()
except Exception:
body = {"raw": response.text}
if response.status_code >= 400:
raise RuntimeError(
f"POST {url} failed with {response.status_code}: {json.dumps(body, ensure_ascii=False)}"
)
if not isinstance(body, dict):
raise RuntimeError(f"Expected JSON object response, got: {body!r}")
return body
def _usage_summary(body: Dict[str, Any]) -> Dict[str, Any]:
usage = body.get("usage")
if not isinstance(usage, dict):
return {}
return usage
def _cached_tokens(body: Dict[str, Any]) -> int | None:
usage = _usage_summary(body)
details = usage.get("input_tokens_details")
if not isinstance(details, dict):
return None
value = details.get("cached_tokens")
try:
return int(value)
except Exception:
return None
def _assistant_message_item(body: Dict[str, Any]) -> Dict[str, Any]:
output = body.get("output")
if not isinstance(output, list):
raise RuntimeError("Response did not include an output list.")
for item in output:
if isinstance(item, dict) and item.get("type") == "message" and item.get("role") == "assistant":
return item
raise RuntimeError("Response did not include an assistant message item.")
def _user_message(text: str) -> Dict[str, Any]:
return {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": text}],
}
def _default_prefix() -> str:
seed = "Cache test prefix. Repeat this context exactly for cache measurement. "
return "".join(seed for _ in range(220))
def main() -> int:
parser = argparse.ArgumentParser(
description="Drive two raw /v1/responses turns through ChatMock and check cached input tokens."
)
parser.add_argument("--base-url", default="http://127.0.0.1:8000", help="ChatMock base URL.")
parser.add_argument("--api-key", default="key", help="Bearer token to send to ChatMock.")
parser.add_argument("--model", default="gpt-5.4", help="Model to request.")
parser.add_argument(
"--session-id",
default=f"cache-check-{uuid.uuid4()}",
help="Fixed X-Session-Id for both turns.",
)
parser.add_argument(
"--prefix",
default=_default_prefix(),
help="Large repeated first-turn prompt prefix.",
)
parser.add_argument(
"--first-question",
default="Reply with exactly: alpha",
help="Trailing instruction for the first turn.",
)
parser.add_argument(
"--second-question",
default="Reply with exactly: beta",
help="Trailing instruction for the second turn.",
)
args = parser.parse_args()
responses_url = args.base_url.rstrip("/") + "/v1/responses"
session_id = args.session_id
first_text = f"{args.prefix}\n\n{args.first_question}"
second_text = args.second_question
print(f"Using session id: {session_id}")
print(f"POST target: {responses_url}")
print("This checks the raw Responses usage object returned through ChatMock.")
print()
first_payload = {
"model": args.model,
"store": False,
"stream": False,
"input": first_text,
}
first_response = _post(responses_url, args.api_key, session_id, first_payload)
assistant_item = _assistant_message_item(first_response)
second_payload = {
"model": args.model,
"store": False,
"stream": False,
"input": [
_user_message(first_text),
assistant_item,
_user_message(second_text),
],
}
second_response = _post(responses_url, args.api_key, session_id, second_payload)
first_usage = _usage_summary(first_response)
second_usage = _usage_summary(second_response)
first_cached = _cached_tokens(first_response)
second_cached = _cached_tokens(second_response)
print("Turn 1")
print(json.dumps(first_usage, indent=2, ensure_ascii=False) if first_usage else " no usage object")
print()
print("Turn 2")
print(json.dumps(second_usage, indent=2, ensure_ascii=False) if second_usage else " no usage object")
print()
if second_cached is None:
first_input_tokens = first_usage.get("input_tokens") if isinstance(first_usage, dict) else None
second_input_tokens = second_usage.get("input_tokens") if isinstance(second_usage, dict) else None
print("Result: inconclusive")
print("Reason: upstream did not include `usage.input_tokens_details.cached_tokens`.")
if isinstance(first_input_tokens, int) and isinstance(second_input_tokens, int):
print(f"Observed input_tokens delta: first={first_input_tokens}, second={second_input_tokens}")
print("Codex treats cached-token reporting as the direct cache-hit signal; without it, this script cannot prove caching.")
return 2
if second_cached > 0:
print(f"Result: success, follow-up turn reported cached_tokens={second_cached}.")
return 0
print("Result: failure, follow-up turn reported cached_tokens=0.")
return 1
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
raise SystemExit(130)
except Exception as exc:
print(f"error: {exc}", file=sys.stderr)
raise SystemExit(1)

View File

@@ -0,0 +1,143 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
import uuid
from typing import Any, Dict, Tuple
from websockets.sync.client import connect
def _user_message(text: str) -> Dict[str, Any]:
return {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": text}],
}
def _receive_turn(ws) -> Tuple[str, Dict[str, Any]]:
response_id: str | None = None
assistant_item: Dict[str, Any] | None = None
while True:
raw = ws.recv(timeout=120)
event = json.loads(raw)
event_type = event.get("type")
if event_type == "error":
raise RuntimeError(f"websocket error: {json.dumps(event, ensure_ascii=False)}")
if event_type == "response.created":
response = event.get("response")
if isinstance(response, dict) and isinstance(response.get("id"), str):
response_id = response["id"]
elif event_type == "response.output_item.done":
item = event.get("item")
if (
isinstance(item, dict)
and item.get("type") == "message"
and item.get("role") == "assistant"
):
assistant_item = item
elif event_type == "response.completed":
if not response_id:
response = event.get("response")
if isinstance(response, dict) and isinstance(response.get("id"), str):
response_id = response["id"]
if not response_id:
raise RuntimeError("turn completed without a response id")
if assistant_item is None:
raise RuntimeError("turn completed without an assistant message item")
return response_id, assistant_item
def main() -> int:
parser = argparse.ArgumentParser(
description="Exercise ChatMock websocket reuse the same way Codex does."
)
parser.add_argument(
"--ws-url",
default="ws://127.0.0.1:8000/v1/responses",
help="ChatMock websocket URL.",
)
parser.add_argument("--model", default="gpt-5.4", help="Model to request.")
parser.add_argument(
"--session-id",
default=f"reuse-demo-{uuid.uuid4()}",
help="Fixed X-Session-Id for the whole run.",
)
parser.add_argument(
"--first-prompt",
default="Say exactly: alpha",
help="Prompt for the first turn.",
)
parser.add_argument(
"--second-prompt",
default="Now say exactly: beta",
help="Prompt appended in the reuse-candidate turn.",
)
parser.add_argument(
"--no-fast-mode",
action="store_true",
help="Do not send fast_mode=true.",
)
args = parser.parse_args()
headers = {"X-Session-Id": args.session_id}
fast_mode = not args.no_fast_mode
print(f"Using websocket session id: {args.session_id}")
print(f"Connecting to: {args.ws_url}")
print("Run ChatMock with `python3 chatmock.py serve --verbose` in another terminal.")
print("This verifies the Codex-aligned path: websocket `response.create` reuse.")
print("HTTP `/v1/responses` is not expected to send `previous_response_id`.")
print()
with connect(args.ws_url, additional_headers=headers, open_timeout=15) as ws:
first_request = {
"type": "response.create",
"model": args.model,
"store": False,
"input": args.first_prompt,
"fast_mode": fast_mode,
}
ws.send(json.dumps(first_request))
first_response_id, assistant_item = _receive_turn(ws)
second_request = {
"type": "response.create",
"model": args.model,
"store": False,
"input": [
_user_message(args.first_prompt),
assistant_item,
_user_message(args.second_prompt),
],
"fast_mode": fast_mode,
}
ws.send(json.dumps(second_request))
second_response_id, _ = _receive_turn(ws)
print("Turn 1 completed.")
print(f" response id: {first_response_id}")
print("Turn 2 completed.")
print(f" response id: {second_response_id}")
print()
print("Expected in the verbose ChatMock server log for turn 2:")
print(" - outbound websocket payload includes `previous_response_id`")
print(" - `previous_response_id` equals the first response id")
print(" - outbound `input` only contains the new trailing user message")
print()
print("If turn 2 still shows the full conversation in the outbound websocket payload, reuse is not working.")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
raise SystemExit(130)
except Exception as exc:
print(f"error: {exc}", file=sys.stderr)
raise SystemExit(1)

49
tests/test_fast_mode.py Normal file
View File

@@ -0,0 +1,49 @@
from __future__ import annotations
import unittest
from chatmock.fast_mode import parse_optional_bool, resolve_service_tier, supports_priority_service_tier
class FastModeTests(unittest.TestCase):
def test_parse_optional_bool(self) -> None:
self.assertTrue(parse_optional_bool(True))
self.assertTrue(parse_optional_bool("true"))
self.assertFalse(parse_optional_bool(False))
self.assertFalse(parse_optional_bool("off"))
self.assertIsNone(parse_optional_bool("maybe"))
def test_priority_allowlist_uses_normalized_model_ids(self) -> None:
self.assertTrue(supports_priority_service_tier("gpt5.4"))
self.assertFalse(supports_priority_service_tier("gpt-5.3-codex"))
def test_explicit_fast_mode_true_errors_for_unsupported_model(self) -> None:
resolution = resolve_service_tier(
"gpt-5.3-codex",
request_fast_mode=True,
server_fast_mode=False,
)
self.assertIsNone(resolution.service_tier)
self.assertIsNotNone(resolution.error_message)
def test_server_default_fast_mode_falls_back_on_unsupported_model(self) -> None:
resolution = resolve_service_tier(
"gpt-5.3-codex",
server_fast_mode=True,
)
self.assertIsNone(resolution.service_tier)
self.assertIsNone(resolution.error_message)
self.assertIsNotNone(resolution.warning_message)
def test_request_fast_mode_false_overrides_server_default(self) -> None:
resolution = resolve_service_tier(
"gpt-5.4",
request_fast_mode=False,
server_fast_mode=True,
)
self.assertIsNone(resolution.service_tier)
self.assertIsNone(resolution.error_message)
if __name__ == "__main__":
unittest.main()

View File

@@ -10,6 +10,7 @@ class ModelRegistryTests(unittest.TestCase):
self.assertEqual(normalize_model_name("gpt5"), "gpt-5")
self.assertEqual(normalize_model_name("gpt5.4"), "gpt-5.4")
self.assertEqual(normalize_model_name("gpt5.4-mini"), "gpt-5.4-mini")
self.assertEqual(normalize_model_name("gpt5.3-codex-spark"), "gpt-5.3-codex-spark")
self.assertEqual(normalize_model_name("codex"), "codex-mini-latest")
def test_strips_reasoning_suffixes(self) -> None:
@@ -28,6 +29,7 @@ class ModelRegistryTests(unittest.TestCase):
model_ids = list_public_models(expose_reasoning_models=True)
self.assertIn("gpt-5.4", model_ids)
self.assertIn("gpt-5.4-mini", model_ids)
self.assertIn("gpt-5.3-codex-spark", model_ids)
self.assertIn("gpt-5.4-none", model_ids)
self.assertIn("gpt-5.4-mini-xhigh", model_ids)
self.assertNotIn("gpt-5.4-mini-none", model_ids)

View File

@@ -1,31 +1,56 @@
from __future__ import annotations
import json
import socket
import threading
import time
import unittest
from unittest.mock import patch
from chatmock.app import create_app
from chatmock.session import reset_session_state
from websockets.sync.client import connect as ws_connect
class FakeUpstream:
def __init__(self, events: list[dict[str, object]], status_code: int = 200) -> None:
def __init__(
self,
events: list[dict[str, object]] | None = None,
*,
status_code: int = 200,
headers: dict[str, str] | None = None,
content: bytes | None = None,
text: str = "",
) -> None:
self._events = events
self.status_code = status_code
self.headers = {}
self.content = b""
self.text = ""
self.headers = headers or {}
self.content = content or b""
self.text = text
def iter_lines(self, decode_unicode: bool = False):
for event in self._events:
for event in self._events or []:
payload = f"data: {json.dumps(event)}"
yield payload if decode_unicode else payload.encode("utf-8")
def iter_content(self, chunk_size=None):
if self.content:
yield self.content
return
for event in self._events or []:
payload = f"data: {json.dumps(event)}\n\n".encode("utf-8")
yield payload
def json(self):
return json.loads(self.content.decode("utf-8"))
def close(self) -> None:
return None
class RouteTests(unittest.TestCase):
def setUp(self) -> None:
reset_session_state()
self.app = create_app()
self.client = self.app.test_client()
@@ -36,6 +61,7 @@ class RouteTests(unittest.TestCase):
model_ids = [item["id"] for item in body["data"]]
self.assertIn("gpt-5.4", model_ids)
self.assertIn("gpt-5.4-mini", model_ids)
self.assertIn("gpt-5.3-codex-spark", model_ids)
def test_ollama_tags_list(self) -> None:
response = self.client.get("/api/tags")
@@ -85,6 +111,443 @@ class RouteTests(unittest.TestCase):
self.assertEqual(body["message"]["content"], "hello")
self.assertEqual(body["model"], "gpt-5.4")
@patch("chatmock.routes_openai.start_upstream_request")
def test_chat_completions_fast_mode_sets_priority_service_tier(self, mock_start) -> None:
mock_start.return_value = (
FakeUpstream(
[
{"type": "response.output_text.delta", "delta": "hello"},
{"type": "response.completed", "response": {"id": "resp-openai"}},
]
),
None,
)
response = self.client.post(
"/v1/chat/completions",
json={
"model": "gpt-5.4",
"fast_mode": True,
"messages": [{"role": "user", "content": "hi"}],
},
)
self.assertEqual(response.status_code, 200)
self.assertEqual(mock_start.call_args.kwargs["service_tier"], "priority")
@patch("chatmock.routes_openai.start_upstream_request")
def test_chat_completions_fast_mode_false_overrides_server_default(self, mock_start) -> None:
app = create_app(fast_mode=True)
client = app.test_client()
mock_start.return_value = (
FakeUpstream(
[
{"type": "response.output_text.delta", "delta": "hello"},
{"type": "response.completed", "response": {"id": "resp-openai"}},
]
),
None,
)
response = client.post(
"/v1/chat/completions",
json={
"model": "gpt-5.4",
"fast_mode": False,
"messages": [{"role": "user", "content": "hi"}],
},
)
self.assertEqual(response.status_code, 200)
self.assertIsNone(mock_start.call_args.kwargs["service_tier"])
@patch("chatmock.routes_openai.start_upstream_request")
def test_chat_completions_rejects_unsupported_explicit_fast_mode(self, mock_start) -> None:
response = self.client.post(
"/v1/chat/completions",
json={
"model": "gpt-5.3-codex",
"fast_mode": True,
"messages": [{"role": "user", "content": "hi"}],
},
)
body = response.get_json()
self.assertEqual(response.status_code, 400)
self.assertIn("Fast mode is not supported", body["error"]["message"])
mock_start.assert_not_called()
@patch("chatmock.routes_openai.start_upstream_raw_request")
def test_responses_route_returns_completed_response_object(self, mock_start) -> None:
mock_start.return_value = (
FakeUpstream(
[
{
"type": "response.created",
"response": {"id": "resp_123", "object": "response", "status": "in_progress"},
},
{
"type": "response.completed",
"response": {
"id": "resp_123",
"object": "response",
"status": "completed",
"output": [],
},
},
],
headers={"Content-Type": "text/event-stream"},
),
None,
)
response = self.client.post(
"/v1/responses",
json={"model": "gpt5.4-mini", "input": "hello"},
)
body = response.get_json()
self.assertEqual(response.status_code, 200)
self.assertEqual(body["id"], "resp_123")
outbound_payload = mock_start.call_args.args[0]
self.assertEqual(outbound_payload["model"], "gpt-5.4-mini")
self.assertEqual(outbound_payload["store"], False)
self.assertEqual(
outbound_payload["input"],
[{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]}],
)
self.assertEqual(outbound_payload["reasoning"]["effort"], "medium")
self.assertIsInstance(outbound_payload["prompt_cache_key"], str)
@patch("chatmock.routes_openai.start_upstream_raw_request")
def test_responses_route_does_not_use_previous_response_id_for_http_follow_up(self, mock_start) -> None:
mock_start.side_effect = [
(
FakeUpstream(
[
{
"type": "response.created",
"response": {"id": "resp_1", "object": "response", "status": "in_progress"},
},
{
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": "msg_1",
"content": [{"type": "output_text", "text": "assistant output"}],
},
},
{
"type": "response.completed",
"response": {"id": "resp_1", "object": "response", "status": "completed", "output": []},
},
],
headers={"Content-Type": "text/event-stream"},
),
None,
),
(
FakeUpstream(
[
{
"type": "response.created",
"response": {"id": "resp_2", "object": "response", "status": "in_progress"},
},
{
"type": "response.completed",
"response": {"id": "resp_2", "object": "response", "status": "completed", "output": []},
},
],
headers={"Content-Type": "text/event-stream"},
),
None,
),
]
first = self.client.post("/v1/responses", json={"model": "gpt-5.4", "input": "hello"})
second = self.client.post(
"/v1/responses",
json={
"model": "gpt-5.4",
"input": [
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]},
{"type": "message", "role": "assistant", "id": "msg_1", "content": [{"type": "output_text", "text": "assistant output"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "second"}]},
],
},
)
self.assertEqual(first.status_code, 200)
self.assertEqual(second.status_code, 200)
outbound_payload = mock_start.call_args_list[1].args[0]
self.assertNotIn("previous_response_id", outbound_payload)
self.assertEqual(
outbound_payload["input"],
[
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]},
{"type": "message", "role": "assistant", "id": "msg_1", "content": [{"type": "output_text", "text": "assistant output"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "second"}]},
],
)
@patch("chatmock.routes_openai.start_upstream_raw_request")
def test_responses_route_falls_back_to_full_create_when_non_input_fields_change(self, mock_start) -> None:
mock_start.side_effect = [
(
FakeUpstream(
[
{
"type": "response.created",
"response": {"id": "resp_1", "object": "response", "status": "in_progress"},
},
{
"type": "response.completed",
"response": {"id": "resp_1", "object": "response", "status": "completed", "output": []},
},
],
headers={"Content-Type": "text/event-stream"},
),
None,
),
(
FakeUpstream(
[
{
"type": "response.created",
"response": {"id": "resp_2", "object": "response", "status": "in_progress"},
},
{
"type": "response.completed",
"response": {"id": "resp_2", "object": "response", "status": "completed", "output": []},
},
],
headers={"Content-Type": "text/event-stream"},
),
None,
),
]
headers = {"X-Session-Id": "session-fixed"}
first = self.client.post("/v1/responses", json={"model": "gpt-5.4", "input": "hello"}, headers=headers)
second = self.client.post(
"/v1/responses",
json={
"model": "gpt-5.4",
"instructions": "changed",
"input": [
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "second"}]},
],
},
headers=headers,
)
self.assertEqual(first.status_code, 200)
self.assertEqual(second.status_code, 200)
outbound_payload = mock_start.call_args_list[1].args[0]
self.assertNotIn("previous_response_id", outbound_payload)
self.assertEqual(
outbound_payload["input"],
[
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "second"}]},
],
)
@patch("chatmock.routes_openai.start_upstream_raw_request")
def test_responses_route_clears_reuse_state_after_error(self, mock_start) -> None:
mock_start.side_effect = [
(
FakeUpstream(
[
{"type": "response.created", "response": {"id": "resp_1"}},
{"type": "response.completed", "response": {"id": "resp_1", "output": []}},
],
headers={"Content-Type": "text/event-stream"},
),
None,
),
(
FakeUpstream(
[
{"type": "response.failed", "response": {"error": {"message": "boom"}}},
],
headers={"Content-Type": "text/event-stream"},
),
None,
),
(
FakeUpstream(
[
{"type": "response.created", "response": {"id": "resp_3"}},
{"type": "response.completed", "response": {"id": "resp_3", "output": []}},
],
headers={"Content-Type": "text/event-stream"},
),
None,
),
]
headers = {"X-Session-Id": "session-fixed"}
first = self.client.post("/v1/responses", json={"model": "gpt-5.4", "input": "hello"}, headers=headers)
second = self.client.post(
"/v1/responses",
json={
"model": "gpt-5.4",
"input": [
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "second"}]},
],
},
headers=headers,
)
third = self.client.post(
"/v1/responses",
json={
"model": "gpt-5.4",
"input": [
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "second"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "third"}]},
],
},
headers=headers,
)
self.assertEqual(first.status_code, 200)
self.assertEqual(second.status_code, 502)
self.assertEqual(third.status_code, 200)
outbound_payload = mock_start.call_args_list[2].args[0]
self.assertNotIn("previous_response_id", outbound_payload)
self.assertEqual(
outbound_payload["input"],
[
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "second"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "third"}]},
],
)
@patch("chatmock.routes_openai.start_upstream_raw_request")
def test_responses_route_stream_passthrough(self, mock_start) -> None:
chunk = b'data: {"type":"response.output_text.delta","delta":"hello"}\n\n'
mock_start.return_value = (
FakeUpstream(
headers={"Content-Type": "text/event-stream"},
content=chunk,
),
None,
)
response = self.client.post(
"/v1/responses",
json={"model": "gpt-5.4", "input": "hello", "stream": True},
)
self.assertEqual(response.status_code, 200)
self.assertIn("response.output_text.delta", response.get_data(as_text=True))
@patch("chatmock.routes_openai.start_upstream_raw_request")
def test_responses_route_rejects_unsupported_explicit_priority(self, mock_start) -> None:
response = self.client.post(
"/v1/responses",
json={"model": "gpt-5.3-codex", "input": "hello", "service_tier": "priority"},
)
body = response.get_json()
self.assertEqual(response.status_code, 400)
self.assertIn("Fast mode is not supported", body["error"]["message"])
mock_start.assert_not_called()
@patch("chatmock.websocket_routes.get_effective_chatgpt_auth", return_value=("token", "acct"))
@patch("chatmock.websocket_routes.connect_upstream_websocket")
def test_responses_websocket_rewrites_response_create(self, mock_connect, _mock_auth) -> None:
class FakeUpstreamWebsocket:
def __init__(self) -> None:
self.sent: list[str] = []
self._messages = [
json.dumps({"type": "response.created", "response": {"id": "resp_ws_1"}}),
json.dumps({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": "msg_1",
"content": [{"type": "output_text", "text": "assistant output"}],
},
}),
json.dumps({"type": "response.completed", "response": {"id": "resp_ws_1"}}),
json.dumps({"type": "response.created", "response": {"id": "resp_ws_2"}}),
json.dumps({"type": "response.completed", "response": {"id": "resp_ws_2"}}),
]
def send(self, message: str) -> None:
self.sent.append(message)
def recv(self) -> str:
return self._messages.pop(0)
def close(self) -> None:
return None
fake_upstream = FakeUpstreamWebsocket()
mock_connect.return_value = fake_upstream
app = create_app()
sock = socket.socket()
sock.bind(("127.0.0.1", 0))
host, port = sock.getsockname()
sock.close()
server_thread = threading.Thread(
target=app.run,
kwargs={
"host": host,
"port": port,
"debug": False,
"use_reloader": False,
"threaded": True,
},
daemon=True,
)
server_thread.start()
time.sleep(0.5)
with ws_connect(f"ws://{host}:{port}/v1/responses") as client:
client.send(json.dumps({"type": "response.create", "model": "gpt-5.4", "input": "hello", "fast_mode": True}))
first = json.loads(client.recv())
assistant = json.loads(client.recv())
second = json.loads(client.recv())
client.send(
json.dumps(
{
"type": "response.create",
"model": "gpt-5.4",
"fast_mode": True,
"input": [
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]},
{"type": "message", "role": "assistant", "id": "msg_1", "content": [{"type": "output_text", "text": "assistant output"}]},
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "second"}]},
],
}
)
)
third = json.loads(client.recv())
fourth = json.loads(client.recv())
self.assertEqual(first["type"], "response.created")
self.assertEqual(assistant["type"], "response.output_item.done")
self.assertEqual(second["type"], "response.completed")
self.assertEqual(third["type"], "response.created")
self.assertEqual(fourth["type"], "response.completed")
outbound = json.loads(fake_upstream.sent[0])
self.assertEqual(outbound["model"], "gpt-5.4")
self.assertEqual(outbound["service_tier"], "priority")
self.assertEqual(outbound["type"], "response.create")
self.assertEqual(
outbound["input"],
[{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "hello"}]}],
)
self.assertIn("prompt_cache_key", outbound)
follow_up = json.loads(fake_upstream.sent[1])
self.assertEqual(follow_up["previous_response_id"], "resp_ws_1")
self.assertEqual(
follow_up["input"],
[{"type": "message", "role": "user", "content": [{"type": "input_text", "text": "second"}]}],
)
if __name__ == "__main__":
unittest.main()

92
uv.lock generated
View File

@@ -109,12 +109,14 @@ dependencies = [
{ name = "blinker" },
{ name = "certifi" },
{ name = "flask" },
{ name = "flask-sock" },
{ name = "idna" },
{ name = "itsdangerous" },
{ name = "jinja2" },
{ name = "markupsafe" },
{ name = "requests" },
{ name = "urllib3" },
{ name = "websockets" },
{ name = "werkzeug" },
]
@@ -130,6 +132,7 @@ requires-dist = [
{ name = "blinker", specifier = "==1.9.0" },
{ name = "certifi", specifier = "==2025.8.3" },
{ name = "flask", specifier = "==3.1.1" },
{ name = "flask-sock", specifier = "==0.7.0" },
{ name = "idna", specifier = "==3.10" },
{ name = "itsdangerous", specifier = "==2.2.0" },
{ name = "jinja2", specifier = "==3.1.6" },
@@ -139,6 +142,7 @@ requires-dist = [
{ name = "pyside6", marker = "extra == 'gui'", specifier = "==6.9.2" },
{ name = "requests", specifier = "==2.32.5" },
{ name = "urllib3", specifier = "==2.5.0" },
{ name = "websockets", specifier = "==15.0.1" },
{ name = "werkzeug", specifier = "==3.1.3" },
]
provides-extras = ["gui"]
@@ -181,6 +185,28 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/3d/68/9d4508e893976286d2ead7f8f571314af6c2037af34853a30fd769c02e9d/flask-3.1.1-py3-none-any.whl", hash = "sha256:07aae2bb5eaf77993ef57e357491839f5fd9f4dc281593a81a9e4d79a24f295c", size = 103305 },
]
[[package]]
name = "flask-sock"
version = "0.7.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "flask" },
{ name = "simple-websocket" },
]
sdist = { url = "https://files.pythonhosted.org/packages/8d/8f/c6ab717dc90f4e46d1430335cd4ab13e3629410bb760c0ead6de476760fb/flask-sock-0.7.0.tar.gz", hash = "sha256:e023b578284195a443b8d8bdb4469e6a6acf694b89aeb51315b1a34fcf427b7d", size = 4334 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d8/98/107728ce3f430b5481eb426ccc5e1f7c8ab0bd01eaf231c62a8d528ff721/flask_sock-0.7.0-py3-none-any.whl", hash = "sha256:caac4d679392aaf010d02fabcf73d52019f5bdaf1c9c131ec5a428cb3491204a", size = 3982 },
]
[[package]]
name = "h11"
version = "0.16.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515 },
]
[[package]]
name = "idna"
version = "3.10"
@@ -507,6 +533,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/48/64/562a527fc55fbf41fa70dae735929988215505cb5ec0809fb0aef921d4a0/shiboken6-6.9.2-cp39-abi3-win_arm64.whl", hash = "sha256:c5b827797b3d89d9b9a3753371ff533fcd4afc4531ca51a7c696952132098054", size = 1708948 },
]
[[package]]
name = "simple-websocket"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "wsproto" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b0/d4/bfa032f961103eba93de583b161f0e6a5b63cebb8f2c7d0c6e6efe1e3d2e/simple_websocket-1.1.0.tar.gz", hash = "sha256:7939234e7aa067c534abdab3a9ed933ec9ce4691b0713c78acb195560aa52ae4", size = 17300 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/52/59/0782e51887ac6b07ffd1570e0364cf901ebc36345fea669969d2084baebb/simple_websocket-1.1.0-py3-none-any.whl", hash = "sha256:4af6069630a38ed6c561010f0e11a5bc0d4ca569b36306eb257cd9a192497c8c", size = 13842 },
]
[[package]]
name = "urllib3"
version = "2.5.0"
@@ -516,6 +554,48 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a7/c2/fe1e52489ae3122415c51f387e221dd0773709bad6c6cdaa599e8a2c5185/urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc", size = 129795 },
]
[[package]]
name = "websockets"
version = "15.0.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/21/e6/26d09fab466b7ca9c7737474c52be4f76a40301b08362eb2dbc19dcc16c1/websockets-15.0.1.tar.gz", hash = "sha256:82544de02076bafba038ce055ee6412d68da13ab47f0c60cab827346de828dee", size = 177016 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9f/32/18fcd5919c293a398db67443acd33fde142f283853076049824fc58e6f75/websockets-15.0.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:823c248b690b2fd9303ba00c4f66cd5e2d8c3ba4aa968b2779be9532a4dad431", size = 175423 },
{ url = "https://files.pythonhosted.org/packages/76/70/ba1ad96b07869275ef42e2ce21f07a5b0148936688c2baf7e4a1f60d5058/websockets-15.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678999709e68425ae2593acf2e3ebcbcf2e69885a5ee78f9eb80e6e371f1bf57", size = 173082 },
{ url = "https://files.pythonhosted.org/packages/86/f2/10b55821dd40eb696ce4704a87d57774696f9451108cff0d2824c97e0f97/websockets-15.0.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d50fd1ee42388dcfb2b3676132c78116490976f1300da28eb629272d5d93e905", size = 173330 },
{ url = "https://files.pythonhosted.org/packages/a5/90/1c37ae8b8a113d3daf1065222b6af61cc44102da95388ac0018fcb7d93d9/websockets-15.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d99e5546bf73dbad5bf3547174cd6cb8ba7273062a23808ffea025ecb1cf8562", size = 182878 },
{ url = "https://files.pythonhosted.org/packages/8e/8d/96e8e288b2a41dffafb78e8904ea7367ee4f891dafc2ab8d87e2124cb3d3/websockets-15.0.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:66dd88c918e3287efc22409d426c8f729688d89a0c587c88971a0faa2c2f3792", size = 181883 },
{ url = "https://files.pythonhosted.org/packages/93/1f/5d6dbf551766308f6f50f8baf8e9860be6182911e8106da7a7f73785f4c4/websockets-15.0.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8dd8327c795b3e3f219760fa603dcae1dcc148172290a8ab15158cf85a953413", size = 182252 },
{ url = "https://files.pythonhosted.org/packages/d4/78/2d4fed9123e6620cbf1706c0de8a1632e1a28e7774d94346d7de1bba2ca3/websockets-15.0.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:8fdc51055e6ff4adeb88d58a11042ec9a5eae317a0a53d12c062c8a8865909e8", size = 182521 },
{ url = "https://files.pythonhosted.org/packages/e7/3b/66d4c1b444dd1a9823c4a81f50231b921bab54eee2f69e70319b4e21f1ca/websockets-15.0.1-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:693f0192126df6c2327cce3baa7c06f2a117575e32ab2308f7f8216c29d9e2e3", size = 181958 },
{ url = "https://files.pythonhosted.org/packages/08/ff/e9eed2ee5fed6f76fdd6032ca5cd38c57ca9661430bb3d5fb2872dc8703c/websockets-15.0.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:54479983bd5fb469c38f2f5c7e3a24f9a4e70594cd68cd1fa6b9340dadaff7cf", size = 181918 },
{ url = "https://files.pythonhosted.org/packages/d8/75/994634a49b7e12532be6a42103597b71098fd25900f7437d6055ed39930a/websockets-15.0.1-cp311-cp311-win32.whl", hash = "sha256:16b6c1b3e57799b9d38427dda63edcbe4926352c47cf88588c0be4ace18dac85", size = 176388 },
{ url = "https://files.pythonhosted.org/packages/98/93/e36c73f78400a65f5e236cd376713c34182e6663f6889cd45a4a04d8f203/websockets-15.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:27ccee0071a0e75d22cb35849b1db43f2ecd3e161041ac1ee9d2352ddf72f065", size = 176828 },
{ url = "https://files.pythonhosted.org/packages/51/6b/4545a0d843594f5d0771e86463606a3988b5a09ca5123136f8a76580dd63/websockets-15.0.1-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:3e90baa811a5d73f3ca0bcbf32064d663ed81318ab225ee4f427ad4e26e5aff3", size = 175437 },
{ url = "https://files.pythonhosted.org/packages/f4/71/809a0f5f6a06522af902e0f2ea2757f71ead94610010cf570ab5c98e99ed/websockets-15.0.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:592f1a9fe869c778694f0aa806ba0374e97648ab57936f092fd9d87f8bc03665", size = 173096 },
{ url = "https://files.pythonhosted.org/packages/3d/69/1a681dd6f02180916f116894181eab8b2e25b31e484c5d0eae637ec01f7c/websockets-15.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0701bc3cfcb9164d04a14b149fd74be7347a530ad3bbf15ab2c678a2cd3dd9a2", size = 173332 },
{ url = "https://files.pythonhosted.org/packages/a6/02/0073b3952f5bce97eafbb35757f8d0d54812b6174ed8dd952aa08429bcc3/websockets-15.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e8b56bdcdb4505c8078cb6c7157d9811a85790f2f2b3632c7d1462ab5783d215", size = 183152 },
{ url = "https://files.pythonhosted.org/packages/74/45/c205c8480eafd114b428284840da0b1be9ffd0e4f87338dc95dc6ff961a1/websockets-15.0.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0af68c55afbd5f07986df82831c7bff04846928ea8d1fd7f30052638788bc9b5", size = 182096 },
{ url = "https://files.pythonhosted.org/packages/14/8f/aa61f528fba38578ec553c145857a181384c72b98156f858ca5c8e82d9d3/websockets-15.0.1-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:64dee438fed052b52e4f98f76c5790513235efaa1ef7f3f2192c392cd7c91b65", size = 182523 },
{ url = "https://files.pythonhosted.org/packages/ec/6d/0267396610add5bc0d0d3e77f546d4cd287200804fe02323797de77dbce9/websockets-15.0.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:d5f6b181bb38171a8ad1d6aa58a67a6aa9d4b38d0f8c5f496b9e42561dfc62fe", size = 182790 },
{ url = "https://files.pythonhosted.org/packages/02/05/c68c5adbf679cf610ae2f74a9b871ae84564462955d991178f95a1ddb7dd/websockets-15.0.1-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:5d54b09eba2bada6011aea5375542a157637b91029687eb4fdb2dab11059c1b4", size = 182165 },
{ url = "https://files.pythonhosted.org/packages/29/93/bb672df7b2f5faac89761cb5fa34f5cec45a4026c383a4b5761c6cea5c16/websockets-15.0.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:3be571a8b5afed347da347bfcf27ba12b069d9d7f42cb8c7028b5e98bbb12597", size = 182160 },
{ url = "https://files.pythonhosted.org/packages/ff/83/de1f7709376dc3ca9b7eeb4b9a07b4526b14876b6d372a4dc62312bebee0/websockets-15.0.1-cp312-cp312-win32.whl", hash = "sha256:c338ffa0520bdb12fbc527265235639fb76e7bc7faafbb93f6ba80d9c06578a9", size = 176395 },
{ url = "https://files.pythonhosted.org/packages/7d/71/abf2ebc3bbfa40f391ce1428c7168fb20582d0ff57019b69ea20fa698043/websockets-15.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:fcd5cf9e305d7b8338754470cf69cf81f420459dbae8a3b40cee57417f4614a7", size = 176841 },
{ url = "https://files.pythonhosted.org/packages/cb/9f/51f0cf64471a9d2b4d0fc6c534f323b664e7095640c34562f5182e5a7195/websockets-15.0.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:ee443ef070bb3b6ed74514f5efaa37a252af57c90eb33b956d35c8e9c10a1931", size = 175440 },
{ url = "https://files.pythonhosted.org/packages/8a/05/aa116ec9943c718905997412c5989f7ed671bc0188ee2ba89520e8765d7b/websockets-15.0.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:5a939de6b7b4e18ca683218320fc67ea886038265fd1ed30173f5ce3f8e85675", size = 173098 },
{ url = "https://files.pythonhosted.org/packages/ff/0b/33cef55ff24f2d92924923c99926dcce78e7bd922d649467f0eda8368923/websockets-15.0.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:746ee8dba912cd6fc889a8147168991d50ed70447bf18bcda7039f7d2e3d9151", size = 173329 },
{ url = "https://files.pythonhosted.org/packages/31/1d/063b25dcc01faa8fada1469bdf769de3768b7044eac9d41f734fd7b6ad6d/websockets-15.0.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:595b6c3969023ecf9041b2936ac3827e4623bfa3ccf007575f04c5a6aa318c22", size = 183111 },
{ url = "https://files.pythonhosted.org/packages/93/53/9a87ee494a51bf63e4ec9241c1ccc4f7c2f45fff85d5bde2ff74fcb68b9e/websockets-15.0.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3c714d2fc58b5ca3e285461a4cc0c9a66bd0e24c5da9911e30158286c9b5be7f", size = 182054 },
{ url = "https://files.pythonhosted.org/packages/ff/b2/83a6ddf56cdcbad4e3d841fcc55d6ba7d19aeb89c50f24dd7e859ec0805f/websockets-15.0.1-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0f3c1e2ab208db911594ae5b4f79addeb3501604a165019dd221c0bdcabe4db8", size = 182496 },
{ url = "https://files.pythonhosted.org/packages/98/41/e7038944ed0abf34c45aa4635ba28136f06052e08fc2168520bb8b25149f/websockets-15.0.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:229cf1d3ca6c1804400b0a9790dc66528e08a6a1feec0d5040e8b9eb14422375", size = 182829 },
{ url = "https://files.pythonhosted.org/packages/e0/17/de15b6158680c7623c6ef0db361da965ab25d813ae54fcfeae2e5b9ef910/websockets-15.0.1-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:756c56e867a90fb00177d530dca4b097dd753cde348448a1012ed6c5131f8b7d", size = 182217 },
{ url = "https://files.pythonhosted.org/packages/33/2b/1f168cb6041853eef0362fb9554c3824367c5560cbdaad89ac40f8c2edfc/websockets-15.0.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:558d023b3df0bffe50a04e710bc87742de35060580a293c2a984299ed83bc4e4", size = 182195 },
{ url = "https://files.pythonhosted.org/packages/86/eb/20b6cdf273913d0ad05a6a14aed4b9a85591c18a987a3d47f20fa13dcc47/websockets-15.0.1-cp313-cp313-win32.whl", hash = "sha256:ba9e56e8ceeeedb2e080147ba85ffcd5cd0711b89576b83784d8605a7df455fa", size = 176393 },
{ url = "https://files.pythonhosted.org/packages/1b/6c/c65773d6cab416a64d191d6ee8a8b1c68a09970ea6909d16965d26bfed1e/websockets-15.0.1-cp313-cp313-win_amd64.whl", hash = "sha256:e09473f095a819042ecb2ab9465aee615bd9c2028e4ef7d933600a8401c79561", size = 176837 },
{ url = "https://files.pythonhosted.org/packages/fa/a8/5b41e0da817d64113292ab1f8247140aac61cbf6cfd085d6a0fa77f4984f/websockets-15.0.1-py3-none-any.whl", hash = "sha256:f7a866fbc1e97b5c617ee4116daaa09b722101d4a3c170c787450ba409f9736f", size = 169743 },
]
[[package]]
name = "werkzeug"
version = "3.1.3"
@@ -527,3 +607,15 @@ sdist = { url = "https://files.pythonhosted.org/packages/9f/69/83029f1f6300c5fb2
wheels = [
{ url = "https://files.pythonhosted.org/packages/52/24/ab44c871b0f07f491e5d2ad12c9bd7358e527510618cb1b803a88e986db1/werkzeug-3.1.3-py3-none-any.whl", hash = "sha256:54b78bf3716d19a65be4fceccc0d1d7b89e608834989dfae50ea87564639213e", size = 224498 },
]
[[package]]
name = "wsproto"
version = "1.3.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c7/79/12135bdf8b9c9367b8701c2c19a14c913c120b882d50b014ca0d38083c2c/wsproto-1.3.2.tar.gz", hash = "sha256:b86885dcf294e15204919950f666e06ffc6c7c114ca900b060d6e16293528294", size = 50116 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a4/f5/10b68b7b1544245097b2a1b8238f66f2fc6dcaeb24ba5d917f52bd2eed4f/wsproto-1.3.2-py3-none-any.whl", hash = "sha256:61eea322cdf56e8cc904bd3ad7573359a242ba65688716b0710a5eb12beab584", size = 24405 },
]