Files
microsoft-SkillOpt/skillopt/model/qwen_backend.py
Matt Van Horn c31c50be51 fix(model): forward Qwen timeout and only set enable_thinking when true
Two bugs made local vLLM targets score acc=0.000: the router did not
forward 'timeout' to the Qwen backend (so runs used the 300s default),
and qwen_backend always injected chat_template_kwargs.enable_thinking,
which non-Qwen vLLM servers reject or answer with <think> output and no
<answer> tag. Forward timeout and only set the field when enabled.

Closes #28

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 07:41:35 -07:00

457 lines
15 KiB
Python

"""OpenAI-compatible Qwen chat backend for optimizer and target paths."""
from __future__ import annotations
from dataclasses import dataclass
import json
import os
import threading
import time
import urllib.error
import urllib.request
from typing import Any
from skillopt.model.common import (
CompatAssistantMessage,
CompatToolCall,
CompatToolFunction,
TokenTracker,
default_model_for_backend,
)
@dataclass
class QwenChatConfig:
base_url: str
api_key: str
timeout_seconds: float
max_tokens: int
temperature: float | None
enable_thinking: bool
deployment: str
def _parse_bool(value: Any, default: bool = False) -> bool:
if value is None:
return default
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def _parse_optional_float(value: Any) -> float | None:
if value is None:
return None
raw = str(value).strip()
return float(raw) if raw else None
def _parse_int(value: Any, default: int) -> int:
if value is None:
return default
raw = str(value).strip()
return int(raw) if raw else default
def _role_env(role: str, key: str, default: str) -> str:
role_key = f"{role.upper()}_QWEN_CHAT_{key}"
generic_key = f"QWEN_CHAT_{key}"
return os.environ.get(role_key) or os.environ.get(generic_key) or default
def _initial_config(role: str) -> QwenChatConfig:
role_upper = role.upper()
deployment_env = "OPTIMIZER_DEPLOYMENT" if role == "optimizer" else "TARGET_DEPLOYMENT"
return QwenChatConfig(
base_url=_role_env(role, "BASE_URL", "http://localhost:8000/v1"),
api_key=_role_env(role, "API_KEY", ""),
timeout_seconds=float(_role_env(role, "TIMEOUT_SECONDS", "300") or 300),
max_tokens=_parse_int(_role_env(role, "MAX_TOKENS", "8000"), 8000),
temperature=_parse_optional_float(_role_env(role, "TEMPERATURE", "0.7")),
enable_thinking=_parse_bool(_role_env(role, "ENABLE_THINKING", "false")),
deployment=(
os.environ.get(f"{role_upper}_QWEN_CHAT_MODEL")
or os.environ.get("QWEN_CHAT_MODEL")
or os.environ.get(deployment_env)
or default_model_for_backend("qwen_chat")
),
)
OPTIMIZER_CONFIG = _initial_config("optimizer")
TARGET_CONFIG = _initial_config("target")
_config_lock = threading.Lock()
tracker = TokenTracker()
def _chat_url(config: QwenChatConfig) -> str:
base = config.base_url.rstrip("/")
if base.endswith("/chat/completions"):
return base
return f"{base}/chat/completions"
def _json_safe(value: Any) -> Any:
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, list):
return [_json_safe(item) for item in value]
if isinstance(value, dict):
return {str(key): _json_safe(val) for key, val in value.items()}
model_dump = getattr(value, "model_dump", None)
if callable(model_dump):
try:
return model_dump(mode="json")
except TypeError:
return model_dump()
return str(value)
def _usage_from_payload(payload: dict[str, Any]) -> dict[str, int]:
usage = payload.get("usage") or {}
prompt_tokens = int(usage.get("prompt_tokens") or usage.get("input_tokens") or 0)
completion_tokens = int(usage.get("completion_tokens") or usage.get("output_tokens") or 0)
total_tokens = int(usage.get("total_tokens") or (prompt_tokens + completion_tokens))
return {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
}
def _compat_message_from_payload(message: dict[str, Any], choice: dict[str, Any]) -> CompatAssistantMessage:
content = message.get("content") or ""
if not isinstance(content, str):
content = json.dumps(content, ensure_ascii=False)
tool_calls: list[CompatToolCall] = []
for index, tool_call in enumerate(message.get("tool_calls") or [], start=1):
function = tool_call.get("function") or {}
tool_calls.append(
CompatToolCall(
id=str(tool_call.get("id") or f"qwen_tool_{index}"),
type=str(tool_call.get("type") or "function"),
function=CompatToolFunction(
name=str(function.get("name") or ""),
arguments=str(function.get("arguments") or "{}"),
),
)
)
return CompatAssistantMessage(
content=content,
tool_calls=tool_calls,
metadata={
"finish_reason": choice.get("finish_reason"),
"choice0": _json_safe(choice),
},
)
def _post_chat_completion(
payload: dict[str, Any],
timeout: float | None,
config: QwenChatConfig,
) -> dict[str, Any]:
headers = {"Content-Type": "application/json"}
if config.api_key:
headers["Authorization"] = f"Bearer {config.api_key}"
req = urllib.request.Request(
_chat_url(config),
data=json.dumps(payload, ensure_ascii=False).encode("utf-8"),
headers=headers,
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout or config.timeout_seconds) as resp:
raw = resp.read().decode("utf-8")
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Qwen chat API returned HTTP {e.code}: {body}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"Qwen chat API request failed: {e}") from e
try:
return json.loads(raw)
except json.JSONDecodeError as e:
raise RuntimeError(f"Qwen chat API returned non-JSON response: {raw[:1000]}") from e
def _chat_messages_impl(
messages: list[dict[str, Any]],
max_completion_tokens: int,
retries: int,
stage: str,
*,
role: str,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
return_message: bool = False,
deployment: str | None = None,
timeout: float | None = None,
) -> tuple[Any, dict[str, int]]:
config = OPTIMIZER_CONFIG if role == "optimizer" else TARGET_CONFIG
payload: dict[str, Any] = {
"model": deployment or config.deployment,
"messages": _json_safe(messages),
"max_tokens": min(max_completion_tokens, config.max_tokens),
}
if config.enable_thinking:
payload["chat_template_kwargs"] = {"enable_thinking": True}
if config.temperature is not None:
payload["temperature"] = config.temperature
if tools:
payload["tools"] = _json_safe(tools)
if tool_choice is not None:
payload["tool_choice"] = _json_safe(tool_choice)
last_err: Exception | None = None
for attempt in range(retries):
try:
data = _post_chat_completion(payload, timeout, config)
choices = data.get("choices") or []
if not choices:
raise RuntimeError(f"Qwen chat API returned no choices: {data}")
choice0 = choices[0]
message = choice0.get("message") or {}
text = message.get("content") or ""
if not isinstance(text, str):
text = json.dumps(text, ensure_ascii=False)
usage_info = _usage_from_payload(data)
tracker.record(stage, usage_info["prompt_tokens"], usage_info["completion_tokens"])
if return_message:
return _compat_message_from_payload(message, choice0), usage_info
return text, usage_info
except Exception as e: # noqa: BLE001
last_err = e
time.sleep(min(2 ** attempt, 30))
raise RuntimeError(f"Qwen chat call failed after {retries} retries: {last_err}")
def configure_qwen_chat(
*,
base_url: str | None = None,
api_key: str | None = None,
temperature: float | str | None = None,
timeout_seconds: float | str | None = None,
max_tokens: int | str | None = None,
enable_thinking: bool | str | None = None,
optimizer_base_url: str | None = None,
optimizer_api_key: str | None = None,
optimizer_temperature: float | str | None = None,
optimizer_timeout_seconds: float | str | None = None,
optimizer_max_tokens: int | str | None = None,
optimizer_enable_thinking: bool | str | None = None,
target_base_url: str | None = None,
target_api_key: str | None = None,
target_temperature: float | str | None = None,
target_timeout_seconds: float | str | None = None,
target_max_tokens: int | str | None = None,
target_enable_thinking: bool | str | None = None,
) -> None:
with _config_lock:
if base_url is not None:
os.environ["QWEN_CHAT_BASE_URL"] = str(base_url).strip()
if api_key is not None:
os.environ["QWEN_CHAT_API_KEY"] = str(api_key).strip()
if temperature is not None:
os.environ["QWEN_CHAT_TEMPERATURE"] = str(temperature).strip()
if timeout_seconds is not None:
os.environ["QWEN_CHAT_TIMEOUT_SECONDS"] = str(timeout_seconds)
if max_tokens is not None:
os.environ["QWEN_CHAT_MAX_TOKENS"] = str(max_tokens)
if enable_thinking is not None:
os.environ["QWEN_CHAT_ENABLE_THINKING"] = (
"true" if _parse_bool(enable_thinking) else "false"
)
_update_config(
OPTIMIZER_CONFIG,
"optimizer",
base_url=optimizer_base_url if optimizer_base_url is not None else base_url,
api_key=optimizer_api_key if optimizer_api_key is not None else api_key,
temperature=(
optimizer_temperature
if optimizer_temperature is not None
else temperature
),
timeout_seconds=(
optimizer_timeout_seconds
if optimizer_timeout_seconds is not None
else timeout_seconds
),
max_tokens=optimizer_max_tokens if optimizer_max_tokens is not None else max_tokens,
enable_thinking=(
optimizer_enable_thinking
if optimizer_enable_thinking is not None
else enable_thinking
),
)
_update_config(
TARGET_CONFIG,
"target",
base_url=target_base_url if target_base_url is not None else base_url,
api_key=target_api_key if target_api_key is not None else api_key,
temperature=target_temperature if target_temperature is not None else temperature,
timeout_seconds=(
target_timeout_seconds
if target_timeout_seconds is not None
else timeout_seconds
),
max_tokens=target_max_tokens if target_max_tokens is not None else max_tokens,
enable_thinking=(
target_enable_thinking
if target_enable_thinking is not None
else enable_thinking
),
)
def _update_config(
config: QwenChatConfig,
role: str,
*,
base_url: str | None = None,
api_key: str | None = None,
temperature: float | str | None = None,
timeout_seconds: float | str | None = None,
max_tokens: int | str | None = None,
enable_thinking: bool | str | None = None,
) -> None:
env_prefix = role.upper()
if base_url is not None:
config.base_url = str(base_url).strip() or config.base_url
os.environ[f"{env_prefix}_QWEN_CHAT_BASE_URL"] = config.base_url
if api_key is not None:
config.api_key = str(api_key).strip()
os.environ[f"{env_prefix}_QWEN_CHAT_API_KEY"] = config.api_key
if temperature is not None:
raw = str(temperature).strip()
config.temperature = float(raw) if raw else None
os.environ[f"{env_prefix}_QWEN_CHAT_TEMPERATURE"] = raw
if timeout_seconds is not None:
config.timeout_seconds = float(timeout_seconds)
os.environ[f"{env_prefix}_QWEN_CHAT_TIMEOUT_SECONDS"] = str(timeout_seconds)
if max_tokens is not None:
config.max_tokens = int(max_tokens)
os.environ[f"{env_prefix}_QWEN_CHAT_MAX_TOKENS"] = str(max_tokens)
if enable_thinking is not None:
config.enable_thinking = _parse_bool(enable_thinking)
os.environ[f"{env_prefix}_QWEN_CHAT_ENABLE_THINKING"] = (
"true" if config.enable_thinking else "false"
)
def get_max_tokens() -> int:
return TARGET_CONFIG.max_tokens
def chat_optimizer(
system: str,
user: str,
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "optimizer",
reasoning_effort: str | None = None,
timeout: float | None = None,
) -> tuple[str, dict[str, int]]:
del reasoning_effort
messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
return _chat_messages_impl(
messages,
max_completion_tokens,
retries,
stage,
role="optimizer",
timeout=timeout,
)
def chat_target(
system: str,
user: str,
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "target",
reasoning_effort: str | None = None,
timeout: float | None = None,
) -> tuple[str, dict[str, int]]:
del reasoning_effort
messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
return _chat_messages_impl(
messages,
max_completion_tokens,
retries,
stage,
role="target",
timeout=timeout,
)
def chat_optimizer_messages(
messages: list[dict[str, Any]],
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "optimizer",
reasoning_effort: str | None = None,
*,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
return_message: bool = False,
timeout: float | None = None,
) -> tuple[Any, dict[str, int]]:
del reasoning_effort
return _chat_messages_impl(
messages,
max_completion_tokens,
retries,
stage,
role="optimizer",
tools=tools,
tool_choice=tool_choice,
return_message=return_message,
timeout=timeout,
)
def chat_target_messages(
messages: list[dict[str, Any]],
max_completion_tokens: int = 16384,
retries: int = 5,
stage: str = "target",
reasoning_effort: str | None = None,
*,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
return_message: bool = False,
timeout: float | None = None,
) -> tuple[Any, dict[str, int]]:
del reasoning_effort
return _chat_messages_impl(
messages,
max_completion_tokens,
retries,
stage,
role="target",
tools=tools,
tool_choice=tool_choice,
return_message=return_message,
timeout=timeout,
)
def get_token_summary() -> dict[str, dict[str, int]]:
return tracker.summary()
def reset_token_tracker() -> None:
tracker.reset()
def set_reasoning_effort(effort: str | None) -> None:
del effort
def set_target_deployment(deployment: str) -> None:
TARGET_CONFIG.deployment = deployment or default_model_for_backend("qwen_chat")
os.environ["TARGET_DEPLOYMENT"] = TARGET_CONFIG.deployment
def set_optimizer_deployment(deployment: str) -> None:
OPTIMIZER_CONFIG.deployment = deployment or default_model_for_backend("qwen_chat")
os.environ["OPTIMIZER_DEPLOYMENT"] = OPTIMIZER_CONFIG.deployment