From 6c0d63869e1b6ec54166334b431fe4acbc5a3ab9 Mon Sep 17 00:00:00 2001 From: Game_Time <108236317+RayBytes@users.noreply.github.com> Date: Thu, 25 Sep 2025 16:43:55 +0500 Subject: [PATCH] add limits to info --- chatmock/cli.py | 181 +++++++++++++++++++++++++++++++++- chatmock/limits.py | 200 ++++++++++++++++++++++++++++++++++++++ chatmock/routes_ollama.py | 4 + chatmock/routes_openai.py | 6 ++ 4 files changed, 388 insertions(+), 3 deletions(-) create mode 100644 chatmock/limits.py diff --git a/chatmock/cli.py b/chatmock/cli.py index 9280d42..d06d677 100644 --- a/chatmock/cli.py +++ b/chatmock/cli.py @@ -1,19 +1,191 @@ from __future__ import annotations +import errno import argparse import json -import errno import os import sys import webbrowser +from datetime import datetime from .app import create_app from .config import CLIENT_ID_DEFAULT +from .limits import RateLimitWindow, compute_reset_at, load_rate_limit_snapshot from .oauth import OAuthHTTPServer, OAuthHandler, REQUIRED_PORT, URL_BASE from .utils import eprint, get_home_dir, load_chatgpt_tokens, parse_jwt_claims, read_auth_file -import os +_STATUS_LIMIT_BAR_SEGMENTS = 30 +_STATUS_LIMIT_BAR_FILLED = "█" +_STATUS_LIMIT_BAR_EMPTY = "░" +_STATUS_LIMIT_BAR_PARTIAL = "▓" + + +def _clamp_percent(value: float) -> float: + try: + percent = float(value) + except Exception: + return 0.0 + if percent != percent: + return 0.0 + if percent < 0.0: + return 0.0 + if percent > 100.0: + return 100.0 + return percent + + +def _render_progress_bar(percent_used: float) -> str: + ratio = max(0.0, min(1.0, percent_used / 100.0)) + filled_exact = ratio * _STATUS_LIMIT_BAR_SEGMENTS + filled = int(filled_exact) + partial = filled_exact - filled + + has_partial = partial > 0.5 + if has_partial: + filled += 1 + + filled = max(0, min(_STATUS_LIMIT_BAR_SEGMENTS, filled)) + empty = _STATUS_LIMIT_BAR_SEGMENTS - filled + + if has_partial and filled > 0: + bar = _STATUS_LIMIT_BAR_FILLED * (filled - 1) + _STATUS_LIMIT_BAR_PARTIAL + _STATUS_LIMIT_BAR_EMPTY * empty + else: + bar = _STATUS_LIMIT_BAR_FILLED * filled + _STATUS_LIMIT_BAR_EMPTY * empty + + return f"[{bar}]" + + +def _get_usage_color(percent_used: float) -> str: + if percent_used >= 90: + return "\033[91m" + elif percent_used >= 75: + return "\033[93m" + elif percent_used >= 50: + return "\033[94m" + else: + return "\033[92m" + + +def _reset_color() -> str: + """ANSI reset color code""" + return "\033[0m" + + +def _format_window_duration(minutes: int | None) -> str | None: + if minutes is None: + return None + try: + total = int(minutes) + except Exception: + return None + if total <= 0: + return None + minutes = total + weeks, remainder = divmod(minutes, 7 * 24 * 60) + days, remainder = divmod(remainder, 24 * 60) + hours, remainder = divmod(remainder, 60) + parts = [] + if weeks: + parts.append(f"{weeks} week" + ("s" if weeks != 1 else "")) + if days: + parts.append(f"{days} day" + ("s" if days != 1 else "")) + if hours: + parts.append(f"{hours} hour" + ("s" if hours != 1 else "")) + if remainder: + parts.append(f"{remainder} minute" + ("s" if remainder != 1 else "")) + if not parts: + parts.append(f"{minutes} minute" + ("s" if minutes != 1 else "")) + return " ".join(parts) + + +def _format_reset_duration(seconds: int | None) -> str | None: + if seconds is None: + return None + try: + value = int(seconds) + except Exception: + return None + if value < 0: + value = 0 + days, remainder = divmod(value, 86400) + hours, remainder = divmod(remainder, 3600) + minutes, remainder = divmod(remainder, 60) + parts: list[str] = [] + if days: + parts.append(f"{days}d") + if hours: + parts.append(f"{hours}h") + if minutes: + parts.append(f"{minutes}m") + if not parts and remainder: + parts.append("under 1m") + if not parts: + parts.append("0m") + return " ".join(parts) + + +def _format_local_datetime(dt: datetime) -> str: + local = dt.astimezone() + tz_name = local.tzname() or "local" + return f"{local.strftime('%b %d, %Y %H:%M')} {tz_name}" + + +def _print_usage_limits_block() -> None: + stored = load_rate_limit_snapshot() + + print("📊 Usage Limits") + + if stored is None: + print(" No usage data available yet. Send a request through ChatMock first.") + print() + return + + update_time = _format_local_datetime(stored.captured_at) + print(f"Last updated: {update_time}") + print() + + windows: list[tuple[str, str, RateLimitWindow]] = [] + if stored.snapshot.primary is not None: + windows.append(("⚡", "5 hour limit", stored.snapshot.primary)) + if stored.snapshot.secondary is not None: + windows.append(("📅", "Weekly limit", stored.snapshot.secondary)) + + if not windows: + print(" Usage data was captured but no limit windows were provided.") + print() + return + + for i, (icon_label, desc, window) in enumerate(windows): + if i > 0: + print() + + percent_used = _clamp_percent(window.used_percent) + remaining = max(0.0, 100.0 - percent_used) + color = _get_usage_color(percent_used) + reset = _reset_color() + + progress = _render_progress_bar(percent_used) + usage_text = f"{percent_used:5.1f}% used" + remaining_text = f"{remaining:5.1f}% left" + + print(f"{icon_label} {desc}") + print(f"{color}{progress}{reset} {color}{usage_text}{reset} | {remaining_text}") + + reset_in = _format_reset_duration(window.resets_in_seconds) + reset_at = compute_reset_at(stored.captured_at, window) + + if reset_in and reset_at: + reset_at_str = _format_local_datetime(reset_at) + print(f" ⏳ Resets in: {reset_in} at {reset_at_str}") + elif reset_in: + print(f" ⏳ Resets in: {reset_in}") + elif reset_at: + reset_at_str = _format_local_datetime(reset_at) + print(f" ⏳ Resets at: {reset_at_str}") + + print() + def cmd_login(no_browser: bool, verbose: bool) -> int: home_dir = get_home_dir() client_id = CLIENT_ID_DEFAULT @@ -197,6 +369,8 @@ def main() -> None: print("👤 Account") print(" • Not signed in") print(" • Run: python3 chatmock.py login") + print("") + _print_usage_limits_block() sys.exit(0) id_claims = parse_jwt_claims(id_token) or {} @@ -219,6 +393,8 @@ def main() -> None: print(f" • Plan: {plan}") if account_id: print(f" • Account ID: {account_id}") + print("") + _print_usage_limits_block() sys.exit(0) else: parser.error("Unknown command") @@ -226,4 +402,3 @@ def main() -> None: if __name__ == "__main__": main() - diff --git a/chatmock/limits.py b/chatmock/limits.py new file mode 100644 index 0000000..862076c --- /dev/null +++ b/chatmock/limits.py @@ -0,0 +1,200 @@ +from __future__ import annotations + +import json +import os +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, Mapping, Optional + +from .utils import get_home_dir + +_PRIMARY_USED = "x-codex-primary-used-percent" +_PRIMARY_WINDOW = "x-codex-primary-window-minutes" +_PRIMARY_RESET = "x-codex-primary-reset-after-seconds" +_SECONDARY_USED = "x-codex-secondary-used-percent" +_SECONDARY_WINDOW = "x-codex-secondary-window-minutes" +_SECONDARY_RESET = "x-codex-secondary-reset-after-seconds" + +_LIMITS_FILENAME = "usage_limits.json" + + +@dataclass +class RateLimitWindow: + used_percent: float + window_minutes: Optional[int] + resets_in_seconds: Optional[int] + + +@dataclass +class RateLimitSnapshot: + primary: Optional[RateLimitWindow] + secondary: Optional[RateLimitWindow] + + +@dataclass +class StoredRateLimitSnapshot: + captured_at: datetime + snapshot: RateLimitSnapshot + + +def _parse_float(value: Any) -> Optional[float]: + try: + if value is None: + return None + if isinstance(value, (int, float)): + return float(value) + value_str = str(value).strip() + if not value_str: + return None + parsed = float(value_str) + if not (parsed == parsed and parsed not in (float("inf"), float("-inf"))): + return None + return parsed + except Exception: + return None + + +def _parse_int(value: Any) -> Optional[int]: + try: + if value is None: + return None + if isinstance(value, bool): + return None + if isinstance(value, int): + return value + value_str = str(value).strip() + if not value_str: + return None + return int(value_str) + except Exception: + return None + + +def _parse_window(headers: Mapping[str, Any], used_key: str, window_key: str, reset_key: str) -> Optional[RateLimitWindow]: + used_percent = _parse_float(headers.get(used_key)) + if used_percent is None: + return None + window_minutes = _parse_int(headers.get(window_key)) + resets_in_seconds = _parse_int(headers.get(reset_key)) + return RateLimitWindow(used_percent=used_percent, window_minutes=window_minutes, resets_in_seconds=resets_in_seconds) + + +def parse_rate_limit_headers(headers: Mapping[str, Any]) -> Optional[RateLimitSnapshot]: + try: + primary = _parse_window(headers, _PRIMARY_USED, _PRIMARY_WINDOW, _PRIMARY_RESET) + secondary = _parse_window(headers, _SECONDARY_USED, _SECONDARY_WINDOW, _SECONDARY_RESET) + if primary is None and secondary is None: + return None + return RateLimitSnapshot(primary=primary, secondary=secondary) + except Exception: + return None + + +def _limits_path() -> str: + home = get_home_dir() + return os.path.join(home, _LIMITS_FILENAME) + + +def store_rate_limit_snapshot(snapshot: RateLimitSnapshot, captured_at: Optional[datetime] = None) -> None: + captured = captured_at or datetime.now(timezone.utc) + try: + home = get_home_dir() + os.makedirs(home, exist_ok=True) + payload: dict[str, Any] = { + "captured_at": captured.isoformat(), + } + if snapshot.primary: + payload["primary"] = { + "used_percent": snapshot.primary.used_percent, + "window_minutes": snapshot.primary.window_minutes, + "resets_in_seconds": snapshot.primary.resets_in_seconds, + } + if snapshot.secondary: + payload["secondary"] = { + "used_percent": snapshot.secondary.used_percent, + "window_minutes": snapshot.secondary.window_minutes, + "resets_in_seconds": snapshot.secondary.resets_in_seconds, + } + with open(_limits_path(), "w", encoding="utf-8") as fp: + if hasattr(os, "fchmod"): + try: + os.fchmod(fp.fileno(), 0o600) + except OSError: + pass + json.dump(payload, fp, indent=2) + except Exception: + # Silently ignore persistence errors. + pass + + +def load_rate_limit_snapshot() -> Optional[StoredRateLimitSnapshot]: + try: + with open(_limits_path(), "r", encoding="utf-8") as fp: + raw = json.load(fp) + except FileNotFoundError: + return None + except Exception: + return None + + captured_raw = raw.get("captured_at") + captured_at = _parse_datetime(captured_raw) + if captured_at is None: + return None + + snapshot = RateLimitSnapshot( + primary=_dict_to_window(raw.get("primary")), + secondary=_dict_to_window(raw.get("secondary")), + ) + if snapshot.primary is None and snapshot.secondary is None: + return None + return StoredRateLimitSnapshot(captured_at=captured_at, snapshot=snapshot) + + +def _parse_datetime(value: Any) -> Optional[datetime]: + if not isinstance(value, str): + return None + text = value.strip() + if not text: + return None + if text.endswith("Z"): + text = text[:-1] + "+00:00" + try: + dt = datetime.fromisoformat(text) + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + return None + + +def _dict_to_window(value: Any) -> Optional[RateLimitWindow]: + if not isinstance(value, dict): + return None + used = _parse_float(value.get("used_percent")) + if used is None: + return None + window = _parse_int(value.get("window_minutes")) + resets = _parse_int(value.get("resets_in_seconds")) + return RateLimitWindow(used_percent=used, window_minutes=window, resets_in_seconds=resets) + + +def record_rate_limits_from_response(response: Any) -> None: + if response is None: + return + headers = getattr(response, "headers", None) + if headers is None: + return + snapshot = parse_rate_limit_headers(headers) + if snapshot is None: + return + store_rate_limit_snapshot(snapshot) + + +def compute_reset_at(captured_at: datetime, window: RateLimitWindow) -> Optional[datetime]: + if window.resets_in_seconds is None: + return None + try: + return captured_at + timedelta(seconds=int(window.resets_in_seconds)) + except Exception: + return None + diff --git a/chatmock/routes_ollama.py b/chatmock/routes_ollama.py index a7bf99b..2772877 100644 --- a/chatmock/routes_ollama.py +++ b/chatmock/routes_ollama.py @@ -8,6 +8,7 @@ from typing import Any, Dict, List from flask import Blueprint, Response, current_app, jsonify, make_response, request, stream_with_context from .config import BASE_INSTRUCTIONS, GPT5_CODEX_INSTRUCTIONS +from .limits import record_rate_limits_from_response from .http import build_cors_headers from .reasoning import build_reasoning_param, extract_reasoning_from_model_name from .transform import convert_ollama_messages, normalize_ollama_tools @@ -206,6 +207,8 @@ def ollama_chat() -> Response: if error_resp is not None: return error_resp + record_rate_limits_from_response(upstream) + if upstream.status_code >= 400: try: err_body = json.loads(upstream.content.decode("utf-8", errors="ignore")) if upstream.content else {"raw": upstream.text} @@ -225,6 +228,7 @@ def ollama_chat() -> Response: parallel_tool_calls=parallel_tool_calls, reasoning_param=build_reasoning_param(reasoning_effort, reasoning_summary, model_reasoning), ) + record_rate_limits_from_response(upstream2) if err2 is None and upstream2 is not None and upstream2.status_code < 400: upstream = upstream2 else: diff --git a/chatmock/routes_openai.py b/chatmock/routes_openai.py index 5f2001e..13dc314 100644 --- a/chatmock/routes_openai.py +++ b/chatmock/routes_openai.py @@ -7,6 +7,7 @@ from typing import Any, Dict, List from flask import Blueprint, Response, current_app, jsonify, make_response, request from .config import BASE_INSTRUCTIONS, GPT5_CODEX_INSTRUCTIONS +from .limits import record_rate_limits_from_response from .http import build_cors_headers from .reasoning import apply_reasoning_to_message, build_reasoning_param, extract_reasoning_from_model_name from .upstream import normalize_model_name, start_upstream_request @@ -143,6 +144,8 @@ def chat_completions() -> Response: if error_resp is not None: return error_resp + record_rate_limits_from_response(upstream) + created = int(time.time()) if upstream.status_code >= 400: try: @@ -164,6 +167,7 @@ def chat_completions() -> Response: parallel_tool_calls=parallel_tool_calls, reasoning_param=reasoning_param, ) + record_rate_limits_from_response(upstream2) if err2 is None and upstream2 is not None and upstream2.status_code < 400: upstream = upstream2 else: @@ -342,6 +346,8 @@ def completions() -> Response: if error_resp is not None: return error_resp + record_rate_limits_from_response(upstream) + created = int(time.time()) if upstream.status_code >= 400: try: