diff --git a/chatmock.py b/chatmock.py index 9a1b923..e776859 100644 --- a/chatmock.py +++ b/chatmock.py @@ -96,6 +96,62 @@ def create_app( reasoning["summary"] = summary return reasoning + def _to_data_url(image_str: str) -> str: + if not isinstance(image_str, str) or not image_str: + return image_str + s = image_str.strip() + if s.startswith("data:image/"): + return s + if s.startswith("http://") or s.startswith("https://"): + return s + b64 = s.replace("\n", "").replace("\r", "") + kind = "image/png" + if b64.startswith("/9j/"): + kind = "image/jpeg" + elif b64.startswith("iVBORw0KGgo"): + kind = "image/png" + elif b64.startswith("R0lGOD"): + kind = "image/gif" + return f"data:{kind};base64,{b64}" + + def _convert_ollama_messages(messages: List[Dict[str, Any]] | None, top_images: List[str] | None) -> List[Dict[str, Any]]: + out: List[Dict[str, Any]] = [] + msgs = messages if isinstance(messages, list) else [] + for m in msgs: + if not isinstance(m, dict): + continue + role = m.get("role") or "user" + content = m.get("content") + images = m.get("images") if isinstance(m.get("images"), list) else [] + parts = [] + if isinstance(content, list): + for p in content: + if isinstance(p, dict) and p.get("type") == "text" and isinstance(p.get("text"), str): + parts.append({"type": "text", "text": p.get("text")}) + elif isinstance(content, str) and content.strip(): + parts.append({"type": "text", "text": content}) + for img in images: + url = _to_data_url(img) + if isinstance(url, str) and url: + parts.append({"type": "image_url", "image_url": {"url": url}}) + if not parts: + parts.append({"type": "text", "text": ""}) + out.append({"role": role, "content": parts}) + if isinstance(top_images, list) and top_images: + attach_to = None + for i in range(len(out) - 1, -1, -1): + if out[i].get("role") == "user": + attach_to = out[i] + break + if attach_to is None: + attach_to = {"role": "user", "content": []} + out.append(attach_to) + for img in top_images: + url = _to_data_url(img) + if isinstance(url, str) and url: + attach_to["content"].append({"type": "image_url", "image_url": {"url": url}}) + return out + @app.route("/v1/chat/completions", methods=["POST", "OPTIONS"]) def chat_completions() -> Response: if request.method == "OPTIONS": @@ -320,6 +376,297 @@ def create_app( resp.headers.setdefault(k, v) return resp + _OLLAMA_FAKE_EVAL = { + "total_duration": 8497226791, + "load_duration": 1747193958, + "prompt_eval_count": 24, + "prompt_eval_duration": 269219750, + "eval_count": 247, + "eval_duration": 6413802458, + } + + @app.route("/api/tags", methods=["GET", "OPTIONS"]) + def ollama_tags() -> Response: + if request.method == "OPTIONS": + resp = make_response("", 204) + for k, v in build_cors_headers().items(): + resp.headers[k] = v + return resp + model_id = "gpt-5" + models = [{ + "name": model_id, + "model": model_id, + "modified_at": "2023-10-01T00:00:00Z", + "size": 815319791, + "digest": "8648f39daa8fbf5b18c7b4e6a8fb4990c692751d49917417b8842ca5758e7ffc", + "details": { + "parent_model": "", + "format": "gguf", + "family": "llama", + "families": ["llama"], + "parameter_size": "8.0B", + "quantization_level": "Q4_0", + }, + }] + resp = make_response(jsonify({"models": models}), 200) + for k, v in build_cors_headers().items(): + resp.headers.setdefault(k, v) + return resp + + @app.route("/api/show", methods=["POST", "OPTIONS"]) + def ollama_show() -> Response: + if request.method == "OPTIONS": + resp = make_response("", 204) + for k, v in build_cors_headers().items(): + resp.headers[k] = v + return resp + try: + payload = request.get_json(silent=True) or {} + except Exception: + payload = {} + model = payload.get("model") + if not isinstance(model, str) or not model.strip(): + return jsonify({"error": "Model not found"}), 400 + v1_show_response = { + "modelfile": "# Modelfile generated by \"ollama show\"\n# To build a new Modelfile based on this one, replace the FROM line with:\n# FROM llava:latest\n\nFROM /models/blobs/sha256:placeholder\nTEMPLATE \"\"\"{{ .System }}\nUSER: {{ .Prompt }}\nASSISTANT: \"\"\"\nPARAMETER num_ctx 100000\nPARAMETER stop \"\"\nPARAMETER stop \"USER:\"\nPARAMETER stop \"ASSISTANT:\"", + "parameters": "num_keep 24\nstop \"<|start_header_id|>\"\nstop \"<|end_header_id|>\"\nstop \"<|eot_id|>\"", + "template": "{{ if .System }}<|start_header_id|>system<|end_header_id|>\n\n{{ .System }}<|eot_id|>{{ end }}{{ if .Prompt }}<|start_header_id|>user<|end_header_id|>\n\n{{ .Prompt }}<|eot_id|>{{ end }}<|start_header_id|>assistant<|end_header_id|>\n\n{{ .Response }}<|eot_id|>", + "details": { + "parent_model": "", + "format": "gguf", + "family": "llama", + "families": ["llama"], + "parameter_size": "8.0B", + "quantization_level": "Q4_0", + }, + "model_info": { + "general.architecture": "llama", + "general.file_type": 2, + "llama.context_length": 2000000, + }, + "capabilities": ["completion", "vision"], + } + resp = make_response(jsonify(v1_show_response), 200) + for k, v in build_cors_headers().items(): + resp.headers.setdefault(k, v) + return resp + + @app.route("/api/chat", methods=["POST", "OPTIONS"]) + def ollama_chat() -> Response: + if request.method == "OPTIONS": + resp = make_response("", 204) + for k, v in build_cors_headers().items(): + resp.headers[k] = v + return resp + + try: + raw = request.get_data(cache=True, as_text=True) or "" + payload = json.loads(raw) if raw else {} + except Exception: + return jsonify({"error": "Invalid JSON body"}), 400 + + model = payload.get("model") + raw_messages = payload.get("messages") + messages = _convert_ollama_messages(raw_messages, payload.get("images") if isinstance(payload.get("images"), list) else None) + stream_req = payload.get("stream") + if stream_req is None: + stream_req = True + stream_req = bool(stream_req) + + if not isinstance(model, str) or not isinstance(messages, list) or not messages: + return jsonify({"error": "Invalid request format"}), 400 + + input_items = convert_chat_messages_to_responses_input(messages) + + upstream, error_resp = _start_upstream_request( + _normalize_model_name(model), + input_items, + instructions=BASE_INSTRUCTIONS, + tools=[], + tool_choice="auto", + parallel_tool_calls=False, + reasoning_param=_build_reasoning_param(None), + ) + if error_resp is not None: + return error_resp + + if upstream.status_code >= 400: + try: + err_body = json.loads(upstream.content.decode("utf-8", errors="ignore")) if upstream.content else {"raw": upstream.text} + except Exception: + err_body = {"raw": upstream.text} + return ( + jsonify({"error": (err_body.get("error", {}) or {}).get("message", "Upstream error")}), + upstream.status_code, + ) + + created_at = str(int(time.time() * 1000)) + + if stream_req: + def _gen(): + compat = (reasoning_compat or "think-tags").strip().lower() + think_open = False + think_closed = False + saw_any_summary = False + pending_summary_paragraph = False + try: + for raw_line in upstream.iter_lines(decode_unicode=False): + if not raw_line: + continue + line = raw_line.decode("utf-8", errors="ignore") if isinstance(raw_line, (bytes, bytearray)) else raw_line + if not line.startswith("data: "): + continue + data = line[len("data: "):].strip() + if not data or data == "[DONE]": + if data == "[DONE]": + break + continue + try: + evt = json.loads(data) + except Exception: + continue + kind = evt.get("type") + + if compat == "think-tags": + if kind == "response.reasoning_summary_part.added": + if saw_any_summary: + pending_summary_paragraph = True + else: + saw_any_summary = True + continue + if kind in ("response.reasoning_summary_text.delta", "response.reasoning_text.delta"): + delta_txt = evt.get("delta") or "" + if not think_open and not think_closed: + out = { + "model": _normalize_model_name(model), + "created_at": created_at, + "message": {"role": "assistant", "content": ""}, + "done": False, + } + yield json.dumps(out, ensure_ascii=False) + "\n\n" + think_open = True + if pending_summary_paragraph: + out = { + "model": _normalize_model_name(model), + "created_at": created_at, + "message": {"role": "assistant", "content": "\n"}, + "done": False, + } + yield json.dumps(out, ensure_ascii=False) + "\n\n" + pending_summary_paragraph = False + if isinstance(delta_txt, str) and delta_txt: + out = { + "model": _normalize_model_name(model), + "created_at": created_at, + "message": {"role": "assistant", "content": delta_txt}, + "done": False, + } + yield json.dumps(out, ensure_ascii=False) + "\n\n" + continue + + if kind == "response.output_text.delta": + if compat == "think-tags" and think_open and not think_closed: + outc = { + "model": _normalize_model_name(model), + "created_at": created_at, + "message": {"role": "assistant", "content": ""}, + "done": False, + } + yield json.dumps(outc, ensure_ascii=False) + "\n\n" + think_open = False + think_closed = True + chunk = evt.get("delta") or "" + if not isinstance(chunk, str) or not chunk: + continue + out = { + "model": _normalize_model_name(model), + "created_at": created_at, + "message": {"role": "assistant", "content": chunk}, + "done": False, + } + yield json.dumps(out, ensure_ascii=False) + "\n\n" + elif kind == "response.completed": + break + finally: + if compat == "think-tags" and think_open and not think_closed: + outc = { + "model": _normalize_model_name(model), + "created_at": created_at, + "message": {"role": "assistant", "content": ""}, + "done": False, + } + yield json.dumps(outc, ensure_ascii=False) + "\n\n" + think_open = False + think_closed = True + upstream.close() + final_out = { + "model": _normalize_model_name(model), + "created_at": created_at, + "message": {"role": "assistant", "content": ""}, + "done": True, + "done_reason": "stop", + } + final_out.update(_OLLAMA_FAKE_EVAL) + yield json.dumps(final_out, ensure_ascii=False) + "\n\n" + + resp = Response(_gen(), status=200, mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}) + for k, v in build_cors_headers().items(): + resp.headers.setdefault(k, v) + return resp + + full_text = "" + reasoning_summary_text = "" + reasoning_full_text = "" + try: + for raw_line in upstream.iter_lines(decode_unicode=False): + if not raw_line: + continue + line = raw_line.decode("utf-8", errors="ignore") if isinstance(raw_line, (bytes, bytearray)) else raw_line + if not line.startswith("data: "): + continue + data = line[len("data: "):].strip() + if not data or data == "[DONE]": + if data == "[DONE]": + break + continue + try: + evt = json.loads(data) + except Exception: + continue + kind = evt.get("type") + if kind == "response.output_text.delta": + full_text += evt.get("delta") or "" + elif kind == "response.reasoning_summary_text.delta": + reasoning_summary_text += evt.get("delta") or "" + elif kind == "response.reasoning_text.delta": + reasoning_full_text += evt.get("delta") or "" + elif kind == "response.completed": + break + finally: + upstream.close() + compat = (reasoning_compat or "think-tags").strip().lower() + if compat == "think-tags": + rtxt_parts = [] + if isinstance(reasoning_summary_text, str) and reasoning_summary_text.strip(): + rtxt_parts.append(reasoning_summary_text) + if isinstance(reasoning_full_text, str) and reasoning_full_text.strip(): + rtxt_parts.append(reasoning_full_text) + rtxt = "\n\n".join([p for p in rtxt_parts if p]) + if rtxt: + full_text = f"{rtxt}" + (full_text or "") + out_json = { + "model": _normalize_model_name(model), + "created_at": created_at, + "message": {"role": "assistant", "content": full_text}, + "done": True, + "done_reason": "stop", + } + out_json.update(_OLLAMA_FAKE_EVAL) + resp = make_response(jsonify(out_json), 200) + for k, v in build_cors_headers().items(): + resp.headers.setdefault(k, v) + return resp + @app.route("/v1/models", methods=["GET", "OPTIONS"]) def list_models() -> Response: if request.method == "OPTIONS":