From bc8c9bc806bf053f60c32fc1a522ad2ba95b65d2 Mon Sep 17 00:00:00 2001
From: Game_Time <108236317+RayBytes@users.noreply.github.com>
Date: Sun, 17 Aug 2025 11:54:52 +0500
Subject: [PATCH] add ollama support
---
chatmock.py | 347 ++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 347 insertions(+)
diff --git a/chatmock.py b/chatmock.py
index 9a1b923..e776859 100644
--- a/chatmock.py
+++ b/chatmock.py
@@ -96,6 +96,62 @@ def create_app(
reasoning["summary"] = summary
return reasoning
+ def _to_data_url(image_str: str) -> str:
+ if not isinstance(image_str, str) or not image_str:
+ return image_str
+ s = image_str.strip()
+ if s.startswith("data:image/"):
+ return s
+ if s.startswith("http://") or s.startswith("https://"):
+ return s
+ b64 = s.replace("\n", "").replace("\r", "")
+ kind = "image/png"
+ if b64.startswith("/9j/"):
+ kind = "image/jpeg"
+ elif b64.startswith("iVBORw0KGgo"):
+ kind = "image/png"
+ elif b64.startswith("R0lGOD"):
+ kind = "image/gif"
+ return f"data:{kind};base64,{b64}"
+
+ def _convert_ollama_messages(messages: List[Dict[str, Any]] | None, top_images: List[str] | None) -> List[Dict[str, Any]]:
+ out: List[Dict[str, Any]] = []
+ msgs = messages if isinstance(messages, list) else []
+ for m in msgs:
+ if not isinstance(m, dict):
+ continue
+ role = m.get("role") or "user"
+ content = m.get("content")
+ images = m.get("images") if isinstance(m.get("images"), list) else []
+ parts = []
+ if isinstance(content, list):
+ for p in content:
+ if isinstance(p, dict) and p.get("type") == "text" and isinstance(p.get("text"), str):
+ parts.append({"type": "text", "text": p.get("text")})
+ elif isinstance(content, str) and content.strip():
+ parts.append({"type": "text", "text": content})
+ for img in images:
+ url = _to_data_url(img)
+ if isinstance(url, str) and url:
+ parts.append({"type": "image_url", "image_url": {"url": url}})
+ if not parts:
+ parts.append({"type": "text", "text": ""})
+ out.append({"role": role, "content": parts})
+ if isinstance(top_images, list) and top_images:
+ attach_to = None
+ for i in range(len(out) - 1, -1, -1):
+ if out[i].get("role") == "user":
+ attach_to = out[i]
+ break
+ if attach_to is None:
+ attach_to = {"role": "user", "content": []}
+ out.append(attach_to)
+ for img in top_images:
+ url = _to_data_url(img)
+ if isinstance(url, str) and url:
+ attach_to["content"].append({"type": "image_url", "image_url": {"url": url}})
+ return out
+
@app.route("/v1/chat/completions", methods=["POST", "OPTIONS"])
def chat_completions() -> Response:
if request.method == "OPTIONS":
@@ -320,6 +376,297 @@ def create_app(
resp.headers.setdefault(k, v)
return resp
+ _OLLAMA_FAKE_EVAL = {
+ "total_duration": 8497226791,
+ "load_duration": 1747193958,
+ "prompt_eval_count": 24,
+ "prompt_eval_duration": 269219750,
+ "eval_count": 247,
+ "eval_duration": 6413802458,
+ }
+
+ @app.route("/api/tags", methods=["GET", "OPTIONS"])
+ def ollama_tags() -> Response:
+ if request.method == "OPTIONS":
+ resp = make_response("", 204)
+ for k, v in build_cors_headers().items():
+ resp.headers[k] = v
+ return resp
+ model_id = "gpt-5"
+ models = [{
+ "name": model_id,
+ "model": model_id,
+ "modified_at": "2023-10-01T00:00:00Z",
+ "size": 815319791,
+ "digest": "8648f39daa8fbf5b18c7b4e6a8fb4990c692751d49917417b8842ca5758e7ffc",
+ "details": {
+ "parent_model": "",
+ "format": "gguf",
+ "family": "llama",
+ "families": ["llama"],
+ "parameter_size": "8.0B",
+ "quantization_level": "Q4_0",
+ },
+ }]
+ resp = make_response(jsonify({"models": models}), 200)
+ for k, v in build_cors_headers().items():
+ resp.headers.setdefault(k, v)
+ return resp
+
+ @app.route("/api/show", methods=["POST", "OPTIONS"])
+ def ollama_show() -> Response:
+ if request.method == "OPTIONS":
+ resp = make_response("", 204)
+ for k, v in build_cors_headers().items():
+ resp.headers[k] = v
+ return resp
+ try:
+ payload = request.get_json(silent=True) or {}
+ except Exception:
+ payload = {}
+ model = payload.get("model")
+ if not isinstance(model, str) or not model.strip():
+ return jsonify({"error": "Model not found"}), 400
+ v1_show_response = {
+ "modelfile": "# Modelfile generated by \"ollama show\"\n# To build a new Modelfile based on this one, replace the FROM line with:\n# FROM llava:latest\n\nFROM /models/blobs/sha256:placeholder\nTEMPLATE \"\"\"{{ .System }}\nUSER: {{ .Prompt }}\nASSISTANT: \"\"\"\nPARAMETER num_ctx 100000\nPARAMETER stop \"\"\nPARAMETER stop \"USER:\"\nPARAMETER stop \"ASSISTANT:\"",
+ "parameters": "num_keep 24\nstop \"<|start_header_id|>\"\nstop \"<|end_header_id|>\"\nstop \"<|eot_id|>\"",
+ "template": "{{ if .System }}<|start_header_id|>system<|end_header_id|>\n\n{{ .System }}<|eot_id|>{{ end }}{{ if .Prompt }}<|start_header_id|>user<|end_header_id|>\n\n{{ .Prompt }}<|eot_id|>{{ end }}<|start_header_id|>assistant<|end_header_id|>\n\n{{ .Response }}<|eot_id|>",
+ "details": {
+ "parent_model": "",
+ "format": "gguf",
+ "family": "llama",
+ "families": ["llama"],
+ "parameter_size": "8.0B",
+ "quantization_level": "Q4_0",
+ },
+ "model_info": {
+ "general.architecture": "llama",
+ "general.file_type": 2,
+ "llama.context_length": 2000000,
+ },
+ "capabilities": ["completion", "vision"],
+ }
+ resp = make_response(jsonify(v1_show_response), 200)
+ for k, v in build_cors_headers().items():
+ resp.headers.setdefault(k, v)
+ return resp
+
+ @app.route("/api/chat", methods=["POST", "OPTIONS"])
+ def ollama_chat() -> Response:
+ if request.method == "OPTIONS":
+ resp = make_response("", 204)
+ for k, v in build_cors_headers().items():
+ resp.headers[k] = v
+ return resp
+
+ try:
+ raw = request.get_data(cache=True, as_text=True) or ""
+ payload = json.loads(raw) if raw else {}
+ except Exception:
+ return jsonify({"error": "Invalid JSON body"}), 400
+
+ model = payload.get("model")
+ raw_messages = payload.get("messages")
+ messages = _convert_ollama_messages(raw_messages, payload.get("images") if isinstance(payload.get("images"), list) else None)
+ stream_req = payload.get("stream")
+ if stream_req is None:
+ stream_req = True
+ stream_req = bool(stream_req)
+
+ if not isinstance(model, str) or not isinstance(messages, list) or not messages:
+ return jsonify({"error": "Invalid request format"}), 400
+
+ input_items = convert_chat_messages_to_responses_input(messages)
+
+ upstream, error_resp = _start_upstream_request(
+ _normalize_model_name(model),
+ input_items,
+ instructions=BASE_INSTRUCTIONS,
+ tools=[],
+ tool_choice="auto",
+ parallel_tool_calls=False,
+ reasoning_param=_build_reasoning_param(None),
+ )
+ if error_resp is not None:
+ return error_resp
+
+ if upstream.status_code >= 400:
+ try:
+ err_body = json.loads(upstream.content.decode("utf-8", errors="ignore")) if upstream.content else {"raw": upstream.text}
+ except Exception:
+ err_body = {"raw": upstream.text}
+ return (
+ jsonify({"error": (err_body.get("error", {}) or {}).get("message", "Upstream error")}),
+ upstream.status_code,
+ )
+
+ created_at = str(int(time.time() * 1000))
+
+ if stream_req:
+ def _gen():
+ compat = (reasoning_compat or "think-tags").strip().lower()
+ think_open = False
+ think_closed = False
+ saw_any_summary = False
+ pending_summary_paragraph = False
+ try:
+ for raw_line in upstream.iter_lines(decode_unicode=False):
+ if not raw_line:
+ continue
+ line = raw_line.decode("utf-8", errors="ignore") if isinstance(raw_line, (bytes, bytearray)) else raw_line
+ if not line.startswith("data: "):
+ continue
+ data = line[len("data: "):].strip()
+ if not data or data == "[DONE]":
+ if data == "[DONE]":
+ break
+ continue
+ try:
+ evt = json.loads(data)
+ except Exception:
+ continue
+ kind = evt.get("type")
+
+ if compat == "think-tags":
+ if kind == "response.reasoning_summary_part.added":
+ if saw_any_summary:
+ pending_summary_paragraph = True
+ else:
+ saw_any_summary = True
+ continue
+ if kind in ("response.reasoning_summary_text.delta", "response.reasoning_text.delta"):
+ delta_txt = evt.get("delta") or ""
+ if not think_open and not think_closed:
+ out = {
+ "model": _normalize_model_name(model),
+ "created_at": created_at,
+ "message": {"role": "assistant", "content": ""},
+ "done": False,
+ }
+ yield json.dumps(out, ensure_ascii=False) + "\n\n"
+ think_open = True
+ if pending_summary_paragraph:
+ out = {
+ "model": _normalize_model_name(model),
+ "created_at": created_at,
+ "message": {"role": "assistant", "content": "\n"},
+ "done": False,
+ }
+ yield json.dumps(out, ensure_ascii=False) + "\n\n"
+ pending_summary_paragraph = False
+ if isinstance(delta_txt, str) and delta_txt:
+ out = {
+ "model": _normalize_model_name(model),
+ "created_at": created_at,
+ "message": {"role": "assistant", "content": delta_txt},
+ "done": False,
+ }
+ yield json.dumps(out, ensure_ascii=False) + "\n\n"
+ continue
+
+ if kind == "response.output_text.delta":
+ if compat == "think-tags" and think_open and not think_closed:
+ outc = {
+ "model": _normalize_model_name(model),
+ "created_at": created_at,
+ "message": {"role": "assistant", "content": ""},
+ "done": False,
+ }
+ yield json.dumps(outc, ensure_ascii=False) + "\n\n"
+ think_open = False
+ think_closed = True
+ chunk = evt.get("delta") or ""
+ if not isinstance(chunk, str) or not chunk:
+ continue
+ out = {
+ "model": _normalize_model_name(model),
+ "created_at": created_at,
+ "message": {"role": "assistant", "content": chunk},
+ "done": False,
+ }
+ yield json.dumps(out, ensure_ascii=False) + "\n\n"
+ elif kind == "response.completed":
+ break
+ finally:
+ if compat == "think-tags" and think_open and not think_closed:
+ outc = {
+ "model": _normalize_model_name(model),
+ "created_at": created_at,
+ "message": {"role": "assistant", "content": ""},
+ "done": False,
+ }
+ yield json.dumps(outc, ensure_ascii=False) + "\n\n"
+ think_open = False
+ think_closed = True
+ upstream.close()
+ final_out = {
+ "model": _normalize_model_name(model),
+ "created_at": created_at,
+ "message": {"role": "assistant", "content": ""},
+ "done": True,
+ "done_reason": "stop",
+ }
+ final_out.update(_OLLAMA_FAKE_EVAL)
+ yield json.dumps(final_out, ensure_ascii=False) + "\n\n"
+
+ resp = Response(_gen(), status=200, mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"})
+ for k, v in build_cors_headers().items():
+ resp.headers.setdefault(k, v)
+ return resp
+
+ full_text = ""
+ reasoning_summary_text = ""
+ reasoning_full_text = ""
+ try:
+ for raw_line in upstream.iter_lines(decode_unicode=False):
+ if not raw_line:
+ continue
+ line = raw_line.decode("utf-8", errors="ignore") if isinstance(raw_line, (bytes, bytearray)) else raw_line
+ if not line.startswith("data: "):
+ continue
+ data = line[len("data: "):].strip()
+ if not data or data == "[DONE]":
+ if data == "[DONE]":
+ break
+ continue
+ try:
+ evt = json.loads(data)
+ except Exception:
+ continue
+ kind = evt.get("type")
+ if kind == "response.output_text.delta":
+ full_text += evt.get("delta") or ""
+ elif kind == "response.reasoning_summary_text.delta":
+ reasoning_summary_text += evt.get("delta") or ""
+ elif kind == "response.reasoning_text.delta":
+ reasoning_full_text += evt.get("delta") or ""
+ elif kind == "response.completed":
+ break
+ finally:
+ upstream.close()
+ compat = (reasoning_compat or "think-tags").strip().lower()
+ if compat == "think-tags":
+ rtxt_parts = []
+ if isinstance(reasoning_summary_text, str) and reasoning_summary_text.strip():
+ rtxt_parts.append(reasoning_summary_text)
+ if isinstance(reasoning_full_text, str) and reasoning_full_text.strip():
+ rtxt_parts.append(reasoning_full_text)
+ rtxt = "\n\n".join([p for p in rtxt_parts if p])
+ if rtxt:
+ full_text = f"{rtxt}" + (full_text or "")
+ out_json = {
+ "model": _normalize_model_name(model),
+ "created_at": created_at,
+ "message": {"role": "assistant", "content": full_text},
+ "done": True,
+ "done_reason": "stop",
+ }
+ out_json.update(_OLLAMA_FAKE_EVAL)
+ resp = make_response(jsonify(out_json), 200)
+ for k, v in build_cors_headers().items():
+ resp.headers.setdefault(k, v)
+ return resp
+
@app.route("/v1/models", methods=["GET", "OPTIONS"])
def list_models() -> Response:
if request.method == "OPTIONS":