add limits to info

This commit is contained in:
Game_Time
2025-09-25 16:43:55 +05:00
parent e56416d653
commit 6c0d63869e
4 changed files with 388 additions and 3 deletions

View File

@@ -1,19 +1,191 @@
from __future__ import annotations
import errno
import argparse
import json
import errno
import os
import sys
import webbrowser
from datetime import datetime
from .app import create_app
from .config import CLIENT_ID_DEFAULT
from .limits import RateLimitWindow, compute_reset_at, load_rate_limit_snapshot
from .oauth import OAuthHTTPServer, OAuthHandler, REQUIRED_PORT, URL_BASE
from .utils import eprint, get_home_dir, load_chatgpt_tokens, parse_jwt_claims, read_auth_file
import os
_STATUS_LIMIT_BAR_SEGMENTS = 30
_STATUS_LIMIT_BAR_FILLED = ""
_STATUS_LIMIT_BAR_EMPTY = ""
_STATUS_LIMIT_BAR_PARTIAL = ""
def _clamp_percent(value: float) -> float:
try:
percent = float(value)
except Exception:
return 0.0
if percent != percent:
return 0.0
if percent < 0.0:
return 0.0
if percent > 100.0:
return 100.0
return percent
def _render_progress_bar(percent_used: float) -> str:
ratio = max(0.0, min(1.0, percent_used / 100.0))
filled_exact = ratio * _STATUS_LIMIT_BAR_SEGMENTS
filled = int(filled_exact)
partial = filled_exact - filled
has_partial = partial > 0.5
if has_partial:
filled += 1
filled = max(0, min(_STATUS_LIMIT_BAR_SEGMENTS, filled))
empty = _STATUS_LIMIT_BAR_SEGMENTS - filled
if has_partial and filled > 0:
bar = _STATUS_LIMIT_BAR_FILLED * (filled - 1) + _STATUS_LIMIT_BAR_PARTIAL + _STATUS_LIMIT_BAR_EMPTY * empty
else:
bar = _STATUS_LIMIT_BAR_FILLED * filled + _STATUS_LIMIT_BAR_EMPTY * empty
return f"[{bar}]"
def _get_usage_color(percent_used: float) -> str:
if percent_used >= 90:
return "\033[91m"
elif percent_used >= 75:
return "\033[93m"
elif percent_used >= 50:
return "\033[94m"
else:
return "\033[92m"
def _reset_color() -> str:
"""ANSI reset color code"""
return "\033[0m"
def _format_window_duration(minutes: int | None) -> str | None:
if minutes is None:
return None
try:
total = int(minutes)
except Exception:
return None
if total <= 0:
return None
minutes = total
weeks, remainder = divmod(minutes, 7 * 24 * 60)
days, remainder = divmod(remainder, 24 * 60)
hours, remainder = divmod(remainder, 60)
parts = []
if weeks:
parts.append(f"{weeks} week" + ("s" if weeks != 1 else ""))
if days:
parts.append(f"{days} day" + ("s" if days != 1 else ""))
if hours:
parts.append(f"{hours} hour" + ("s" if hours != 1 else ""))
if remainder:
parts.append(f"{remainder} minute" + ("s" if remainder != 1 else ""))
if not parts:
parts.append(f"{minutes} minute" + ("s" if minutes != 1 else ""))
return " ".join(parts)
def _format_reset_duration(seconds: int | None) -> str | None:
if seconds is None:
return None
try:
value = int(seconds)
except Exception:
return None
if value < 0:
value = 0
days, remainder = divmod(value, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, remainder = divmod(remainder, 60)
parts: list[str] = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
if not parts and remainder:
parts.append("under 1m")
if not parts:
parts.append("0m")
return " ".join(parts)
def _format_local_datetime(dt: datetime) -> str:
local = dt.astimezone()
tz_name = local.tzname() or "local"
return f"{local.strftime('%b %d, %Y %H:%M')} {tz_name}"
def _print_usage_limits_block() -> None:
stored = load_rate_limit_snapshot()
print("📊 Usage Limits")
if stored is None:
print(" No usage data available yet. Send a request through ChatMock first.")
print()
return
update_time = _format_local_datetime(stored.captured_at)
print(f"Last updated: {update_time}")
print()
windows: list[tuple[str, str, RateLimitWindow]] = []
if stored.snapshot.primary is not None:
windows.append(("", "5 hour limit", stored.snapshot.primary))
if stored.snapshot.secondary is not None:
windows.append(("📅", "Weekly limit", stored.snapshot.secondary))
if not windows:
print(" Usage data was captured but no limit windows were provided.")
print()
return
for i, (icon_label, desc, window) in enumerate(windows):
if i > 0:
print()
percent_used = _clamp_percent(window.used_percent)
remaining = max(0.0, 100.0 - percent_used)
color = _get_usage_color(percent_used)
reset = _reset_color()
progress = _render_progress_bar(percent_used)
usage_text = f"{percent_used:5.1f}% used"
remaining_text = f"{remaining:5.1f}% left"
print(f"{icon_label} {desc}")
print(f"{color}{progress}{reset} {color}{usage_text}{reset} | {remaining_text}")
reset_in = _format_reset_duration(window.resets_in_seconds)
reset_at = compute_reset_at(stored.captured_at, window)
if reset_in and reset_at:
reset_at_str = _format_local_datetime(reset_at)
print(f" ⏳ Resets in: {reset_in} at {reset_at_str}")
elif reset_in:
print(f" ⏳ Resets in: {reset_in}")
elif reset_at:
reset_at_str = _format_local_datetime(reset_at)
print(f" ⏳ Resets at: {reset_at_str}")
print()
def cmd_login(no_browser: bool, verbose: bool) -> int:
home_dir = get_home_dir()
client_id = CLIENT_ID_DEFAULT
@@ -197,6 +369,8 @@ def main() -> None:
print("👤 Account")
print(" • Not signed in")
print(" • Run: python3 chatmock.py login")
print("")
_print_usage_limits_block()
sys.exit(0)
id_claims = parse_jwt_claims(id_token) or {}
@@ -219,6 +393,8 @@ def main() -> None:
print(f" • Plan: {plan}")
if account_id:
print(f" • Account ID: {account_id}")
print("")
_print_usage_limits_block()
sys.exit(0)
else:
parser.error("Unknown command")
@@ -226,4 +402,3 @@ def main() -> None:
if __name__ == "__main__":
main()

200
chatmock/limits.py Normal file
View File

@@ -0,0 +1,200 @@
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Mapping, Optional
from .utils import get_home_dir
_PRIMARY_USED = "x-codex-primary-used-percent"
_PRIMARY_WINDOW = "x-codex-primary-window-minutes"
_PRIMARY_RESET = "x-codex-primary-reset-after-seconds"
_SECONDARY_USED = "x-codex-secondary-used-percent"
_SECONDARY_WINDOW = "x-codex-secondary-window-minutes"
_SECONDARY_RESET = "x-codex-secondary-reset-after-seconds"
_LIMITS_FILENAME = "usage_limits.json"
@dataclass
class RateLimitWindow:
used_percent: float
window_minutes: Optional[int]
resets_in_seconds: Optional[int]
@dataclass
class RateLimitSnapshot:
primary: Optional[RateLimitWindow]
secondary: Optional[RateLimitWindow]
@dataclass
class StoredRateLimitSnapshot:
captured_at: datetime
snapshot: RateLimitSnapshot
def _parse_float(value: Any) -> Optional[float]:
try:
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
value_str = str(value).strip()
if not value_str:
return None
parsed = float(value_str)
if not (parsed == parsed and parsed not in (float("inf"), float("-inf"))):
return None
return parsed
except Exception:
return None
def _parse_int(value: Any) -> Optional[int]:
try:
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, int):
return value
value_str = str(value).strip()
if not value_str:
return None
return int(value_str)
except Exception:
return None
def _parse_window(headers: Mapping[str, Any], used_key: str, window_key: str, reset_key: str) -> Optional[RateLimitWindow]:
used_percent = _parse_float(headers.get(used_key))
if used_percent is None:
return None
window_minutes = _parse_int(headers.get(window_key))
resets_in_seconds = _parse_int(headers.get(reset_key))
return RateLimitWindow(used_percent=used_percent, window_minutes=window_minutes, resets_in_seconds=resets_in_seconds)
def parse_rate_limit_headers(headers: Mapping[str, Any]) -> Optional[RateLimitSnapshot]:
try:
primary = _parse_window(headers, _PRIMARY_USED, _PRIMARY_WINDOW, _PRIMARY_RESET)
secondary = _parse_window(headers, _SECONDARY_USED, _SECONDARY_WINDOW, _SECONDARY_RESET)
if primary is None and secondary is None:
return None
return RateLimitSnapshot(primary=primary, secondary=secondary)
except Exception:
return None
def _limits_path() -> str:
home = get_home_dir()
return os.path.join(home, _LIMITS_FILENAME)
def store_rate_limit_snapshot(snapshot: RateLimitSnapshot, captured_at: Optional[datetime] = None) -> None:
captured = captured_at or datetime.now(timezone.utc)
try:
home = get_home_dir()
os.makedirs(home, exist_ok=True)
payload: dict[str, Any] = {
"captured_at": captured.isoformat(),
}
if snapshot.primary:
payload["primary"] = {
"used_percent": snapshot.primary.used_percent,
"window_minutes": snapshot.primary.window_minutes,
"resets_in_seconds": snapshot.primary.resets_in_seconds,
}
if snapshot.secondary:
payload["secondary"] = {
"used_percent": snapshot.secondary.used_percent,
"window_minutes": snapshot.secondary.window_minutes,
"resets_in_seconds": snapshot.secondary.resets_in_seconds,
}
with open(_limits_path(), "w", encoding="utf-8") as fp:
if hasattr(os, "fchmod"):
try:
os.fchmod(fp.fileno(), 0o600)
except OSError:
pass
json.dump(payload, fp, indent=2)
except Exception:
# Silently ignore persistence errors.
pass
def load_rate_limit_snapshot() -> Optional[StoredRateLimitSnapshot]:
try:
with open(_limits_path(), "r", encoding="utf-8") as fp:
raw = json.load(fp)
except FileNotFoundError:
return None
except Exception:
return None
captured_raw = raw.get("captured_at")
captured_at = _parse_datetime(captured_raw)
if captured_at is None:
return None
snapshot = RateLimitSnapshot(
primary=_dict_to_window(raw.get("primary")),
secondary=_dict_to_window(raw.get("secondary")),
)
if snapshot.primary is None and snapshot.secondary is None:
return None
return StoredRateLimitSnapshot(captured_at=captured_at, snapshot=snapshot)
def _parse_datetime(value: Any) -> Optional[datetime]:
if not isinstance(value, str):
return None
text = value.strip()
if not text:
return None
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(text)
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return None
def _dict_to_window(value: Any) -> Optional[RateLimitWindow]:
if not isinstance(value, dict):
return None
used = _parse_float(value.get("used_percent"))
if used is None:
return None
window = _parse_int(value.get("window_minutes"))
resets = _parse_int(value.get("resets_in_seconds"))
return RateLimitWindow(used_percent=used, window_minutes=window, resets_in_seconds=resets)
def record_rate_limits_from_response(response: Any) -> None:
if response is None:
return
headers = getattr(response, "headers", None)
if headers is None:
return
snapshot = parse_rate_limit_headers(headers)
if snapshot is None:
return
store_rate_limit_snapshot(snapshot)
def compute_reset_at(captured_at: datetime, window: RateLimitWindow) -> Optional[datetime]:
if window.resets_in_seconds is None:
return None
try:
return captured_at + timedelta(seconds=int(window.resets_in_seconds))
except Exception:
return None

View File

@@ -8,6 +8,7 @@ from typing import Any, Dict, List
from flask import Blueprint, Response, current_app, jsonify, make_response, request, stream_with_context
from .config import BASE_INSTRUCTIONS, GPT5_CODEX_INSTRUCTIONS
from .limits import record_rate_limits_from_response
from .http import build_cors_headers
from .reasoning import build_reasoning_param, extract_reasoning_from_model_name
from .transform import convert_ollama_messages, normalize_ollama_tools
@@ -206,6 +207,8 @@ def ollama_chat() -> Response:
if error_resp is not None:
return error_resp
record_rate_limits_from_response(upstream)
if upstream.status_code >= 400:
try:
err_body = json.loads(upstream.content.decode("utf-8", errors="ignore")) if upstream.content else {"raw": upstream.text}
@@ -225,6 +228,7 @@ def ollama_chat() -> Response:
parallel_tool_calls=parallel_tool_calls,
reasoning_param=build_reasoning_param(reasoning_effort, reasoning_summary, model_reasoning),
)
record_rate_limits_from_response(upstream2)
if err2 is None and upstream2 is not None and upstream2.status_code < 400:
upstream = upstream2
else:

View File

@@ -7,6 +7,7 @@ from typing import Any, Dict, List
from flask import Blueprint, Response, current_app, jsonify, make_response, request
from .config import BASE_INSTRUCTIONS, GPT5_CODEX_INSTRUCTIONS
from .limits import record_rate_limits_from_response
from .http import build_cors_headers
from .reasoning import apply_reasoning_to_message, build_reasoning_param, extract_reasoning_from_model_name
from .upstream import normalize_model_name, start_upstream_request
@@ -143,6 +144,8 @@ def chat_completions() -> Response:
if error_resp is not None:
return error_resp
record_rate_limits_from_response(upstream)
created = int(time.time())
if upstream.status_code >= 400:
try:
@@ -164,6 +167,7 @@ def chat_completions() -> Response:
parallel_tool_calls=parallel_tool_calls,
reasoning_param=reasoning_param,
)
record_rate_limits_from_response(upstream2)
if err2 is None and upstream2 is not None and upstream2.status_code < 400:
upstream = upstream2
else:
@@ -342,6 +346,8 @@ def completions() -> Response:
if error_resp is not None:
return error_resp
record_rate_limits_from_response(upstream)
created = int(time.time())
if upstream.status_code >= 400:
try: