"""
SkillOpt WebUI — Configure, launch, and monitor training from your browser.
Usage:
python -m skillopt_webui.app [--port PORT] [--share]
"""
import argparse
import glob
import json
import os
import signal
import socket
import subprocess
import sys
import threading
from pathlib import Path
from urllib.parse import urlparse
import gradio as gr
import yaml
from skillopt.config import flatten_config
from skillopt.config import load_config as load_merged_config
PROJECT_ROOT = Path(__file__).resolve().parent.parent
# ─── Config helpers ──────────────────────────────────────────────────────────
def discover_configs() -> list[str]:
"""Find all YAML configs under configs/."""
pattern = str(PROJECT_ROOT / "configs" / "**" / "*.yaml")
paths = sorted(glob.glob(pattern, recursive=True))
return [os.path.relpath(p, PROJECT_ROOT) for p in paths
if "_base_" not in p]
def load_config(path: str) -> dict:
"""Load a YAML config file."""
with open(PROJECT_ROOT / path) as f:
return yaml.safe_load(f)
def config_to_display(cfg: dict) -> str:
"""Pretty-print config for display."""
return yaml.dump(cfg, default_flow_style=False, sort_keys=False)
def _can_connect_to_url(url: str, timeout: float = 0.5) -> bool:
parsed = urlparse(url)
host = parsed.hostname
if not host:
return False
port = parsed.port or (443 if parsed.scheme == "https" else 80)
try:
with socket.create_connection((host, port), timeout=timeout):
return True
except OSError:
return False
def _load_env_file(path: Path, env: dict[str, str]) -> None:
for line in path.read_text().splitlines():
line = line.strip()
if line.startswith("export "):
line = line[len("export "):].strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
env[key.strip()] = value.strip().strip("\"'")
def build_training_env() -> dict[str, str]:
"""Build the environment shared by preflight and the training subprocess."""
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
dot_env = PROJECT_ROOT / ".env"
if dot_env.is_file():
_load_env_file(dot_env, env)
secrets_dir = PROJECT_ROOT / ".secrets"
if secrets_dir.is_dir():
for env_file in sorted(secrets_dir.glob("*.env")):
_load_env_file(env_file, env)
# Propagate OPTIMIZER_* to base AZURE_OPENAI_* when base is missing,
# so target/default endpoints inherit from optimizer config.
for suffix in (
"ENDPOINT", "API_VERSION", "AUTH_MODE", "MANAGED_IDENTITY_CLIENT_ID",
"AD_SCOPE", "API_KEY",
):
base_key = f"AZURE_OPENAI_{suffix}"
optimizer_key = f"OPTIMIZER_AZURE_OPENAI_{suffix}"
if not env.get(base_key) and env.get(optimizer_key):
env[base_key] = env[optimizer_key]
return env
def validate_training_config(
config_path: str,
overrides: dict,
env: dict[str, str] | None = None,
) -> str | None:
"""Return an actionable preflight error, or None when training can start."""
env = env or os.environ
cfg_options = [
f"{key}={value}" for key, value in overrides.items()
if value is not None and value != ""
]
try:
cfg = flatten_config(load_merged_config(str(PROJECT_ROOT / config_path), cfg_options))
except Exception as exc:
return f"❌ Invalid config: {exc}"
shared_endpoint = (
cfg.get("azure_openai_endpoint")
or cfg.get("azure_endpoint")
or env.get("AZURE_OPENAI_ENDPOINT")
)
missing_openai_roles = []
for role in ("optimizer", "target"):
if cfg.get(f"{role}_backend") != "openai_chat":
continue
role_endpoint = (
cfg.get(f"{role}_azure_openai_endpoint")
or env.get(f"{role.upper()}_AZURE_OPENAI_ENDPOINT")
or shared_endpoint
)
if not role_endpoint:
missing_openai_roles.append(role)
if missing_openai_roles:
configured_backend = cfg.get("model_backend")
detail = ""
if configured_backend in {"qwen", "qwen_chat"}:
detail = (
"\nNote: model.backend is qwen, but explicit optimizer_backend/"
"target_backend values are still openai_chat."
)
return (
"❌ Model backend is not ready: missing Azure/OpenAI-compatible endpoint "
f"for {', '.join(missing_openai_roles)}.\n"
"Set model.azure_openai_endpoint (or AZURE_OPENAI_ENDPOINT), or change "
"the role backends to the backend you intend to use."
f"{detail}"
)
qwen_failures = []
qwen_shared = (
cfg.get("qwen_chat_base_url")
or env.get("QWEN_CHAT_BASE_URL")
or "http://localhost:8000/v1"
)
for role in ("optimizer", "target"):
if cfg.get(f"{role}_backend") != "qwen_chat":
continue
base_url = (
cfg.get(f"{role}_qwen_chat_base_url")
or env.get(f"{role.upper()}_QWEN_CHAT_BASE_URL")
or qwen_shared
)
if not _can_connect_to_url(str(base_url)):
qwen_failures.append(f"{role}={base_url}")
if qwen_failures:
return (
"❌ Model backend is not ready: cannot connect to qwen_chat endpoint "
f"for {', '.join(qwen_failures)}.\n"
"Start your OpenAI-compatible Qwen/vLLM server, or set "
"model.qwen_chat_base_url / OPTIMIZER_QWEN_CHAT_BASE_URL / "
"TARGET_QWEN_CHAT_BASE_URL to the correct URL."
)
return None
# ─── Training process management ────────────────────────────────────────────
class TrainingManager:
"""Manages a single training subprocess."""
def __init__(self):
self._lock = threading.Lock()
self.process = None
self.log_lines: list[str] = []
self.stage = "Idle"
self.step = 0
self.total_steps = 0
self.epoch = 0
self.total_epochs = 0
self.running = False
def start(self, config_path: str, overrides: dict) -> str:
with self._lock:
if self.running:
return "⚠️ Training already running. Stop it first."
env = build_training_env()
preflight_error = validate_training_config(config_path, overrides, env)
if preflight_error:
return preflight_error
cmd = [
sys.executable, "scripts/train.py",
"--config", config_path,
]
cfg_options = []
for k, v in overrides.items():
if v is not None and v != "":
cfg_options.append(f"{k}={v}")
if cfg_options:
cmd.append("--cfg-options")
cmd.extend(cfg_options)
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=str(PROJECT_ROOT),
bufsize=1,
env=env,
start_new_session=True, # create process group for clean kill
)
except Exception as e:
return f"❌ Failed to start training: {e}"
with self._lock:
self.process = proc
self.log_lines = [f"$ {' '.join(cmd)}\n"]
self.stage = "Starting"
self.step = 0
self.total_steps = 0
self.epoch = 0
self.total_epochs = 0
self.running = True
thread = threading.Thread(target=self._read_output, daemon=True)
thread.start()
return "✅ Training started!"
def _read_output(self):
for line in self.process.stdout:
with self._lock:
self.log_lines.append(line)
self._parse_stage(line)
if len(self.log_lines) > 5000:
self.log_lines = self.log_lines[-4000:]
self.process.wait()
with self._lock:
self.running = False
self.stage = f"Finished (exit={self.process.returncode})"
def _parse_stage(self, line: str):
line_lower = line.lower()
if "1/6 rollout" in line_lower or ("rollout" in line_lower and "worker" in line_lower):
self.stage = "🎯 Rollout"
elif "2/6 reflect" in line_lower or ("reflect" in line_lower and "patch" in line_lower):
self.stage = "🔍 Reflect"
elif "3/6 aggregate" in line_lower or "merge" in line_lower:
self.stage = "🔗 Aggregate"
elif "4/6 select" in line_lower:
self.stage = "✂️ Select"
elif "5/6 update" in line_lower:
self.stage = "📝 Update"
elif "6/6" in line_lower or ("gate" in line_lower and "score" in line_lower):
self.stage = "🚦 Gate"
elif "slow update" in line_lower:
self.stage = "🔄 Slow Update"
elif "meta skill" in line_lower:
self.stage = "🧠 Meta Skill"
elif "baseline" in line_lower and "evaluate" in line_lower:
self.stage = "📊 Baseline"
if "[step" in line_lower:
try:
parts = line.split("[STEP")[1].split("]")[0].split("/")
self.step = int(parts[0].strip())
self.total_steps = int(parts[1].strip())
except (IndexError, ValueError):
pass
if "[epoch" in line_lower:
try:
parts = line.split("[EPOCH")[1].split("]")[0].split("/")
self.epoch = int(parts[0].strip())
self.total_epochs = int(parts[1].strip())
except (IndexError, ValueError):
pass
def stop(self) -> str:
with self._lock:
if self.process and self.running:
try:
# Kill entire process group (children included)
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
except (ProcessLookupError, OSError):
self.process.terminate()
self.process.wait(timeout=5)
self.running = False
self.stage = "Stopped"
return "🛑 Training stopped."
return "No training running."
def get_logs(self) -> str:
with self._lock:
return "".join(self.log_lines[-500:])
def get_colored_logs_html(self) -> str:
"""Render last 300 log lines with color-coded stages."""
import html as html_mod
with self._lock:
lines = list(self.log_lines[-300:])
parts = []
for line in lines:
# Rebrand: display "skillopt" instead of "reflact" in logs
line_display = line.replace("reflact", "skillopt").replace("ReflACT", "SkillOpt").replace("Reflact", "Skillopt").replace("REFLACT", "SKILLOPT")
escaped = html_mod.escape(line_display.rstrip("\n"))
low = line.lower()
if "[epoch" in low:
color = "#f59e0b" # amber
weight = "700"
elif "[step" in low:
color = "#8b5cf6" # purple
weight = "700"
elif "rollout]" in low or "1/6" in low:
color = "#3b82f6" # blue
elif "reflect" in low or "2/6" in low:
color = "#f97316" # orange
elif "aggregate" in low or "3/6" in low or "merge" in low:
color = "#06b6d4" # cyan
elif "select" in low or "4/6" in low:
color = "#ec4899" # pink
elif "update" in low or "5/6" in low:
color = "#10b981" # green
elif "gate" in low or "6/6" in low:
color = "#ef4444" # red
elif "slow update" in low:
color = "#f59e0b" # amber
weight = "700"
elif "meta skill" in low:
color = "#a855f7" # violet
weight = "700"
elif "baseline" in low:
color = "#6366f1" # indigo
weight = "700"
elif "[rollout]" in low:
# per-item rollout progress
if "hard=1" in line:
color = "#22c55e" # green for correct
elif "hard=0" in line:
color = "#f87171" # red for wrong
elif "timeout" in low:
color = "#fbbf24" # yellow for timeout
else:
color = "#94a3b8" # gray
weight = "400"
elif "error" in low or "fail" in low:
color = "#ef4444"
weight = "700"
elif "========" in line:
color = "#64748b" # separator
weight = "400"
else:
color = "#e2e8f0" # default light gray
weight = "400"
if "weight" not in dir():
weight = "400"
parts.append(f'{escaped}')
weight = "400" # reset
log_html = "
".join(parts) if parts else 'No logs yet. Click Refresh after launching training.'
return f'''