Merge pull request #32 from Yif-Yang/fix/issue-30-docs-and-template

Fix/issue 30 docs and template
This commit is contained in:
Ziyang Gong
2026-06-02 10:12:48 +08:00
committed by GitHub
6 changed files with 804 additions and 356 deletions

View File

@@ -1,181 +1,393 @@
# Add a New Benchmark
Extend SkillOpt with your own benchmark in ~100 lines of code.
Extend SkillOpt with your own benchmark in ~200 lines of code. We will use
a tiny worked example, `docfaithful`, that scores a target model on
how faithfully it answers questions grounded in a small reference doc.
## Overview
> **Working reference.** The easiest way to copy-cargo-cult a new env is
> to read [`skillopt/envs/officeqa/`](https://github.com/microsoft/SkillOpt/tree/main/skillopt/envs/officeqa).
> Everything below is the same shape, simplified.
To add a benchmark, you need:
## What you need to build
1. **Data Loader** — Loads and splits your dataset
2. **Environment Adapter** — Executes tasks and returns scores
3. **Config** — YAML configuration file
To add a benchmark you implement four things:
## Step 1: Create the Benchmark Package
1. **A `SplitDataLoader` subclass** — knows how to load train / val / test
item dicts from disk.
2. **A rollout helper** — runs the target model on a batch of items
under the current skill and scores each prediction.
3. **An `EnvAdapter` subclass** — wires the loader + rollout helper into
SkillOpt's lifecycle (`build_*_env`, `rollout`, `reflect`,
`get_task_types`).
4. **A YAML config** — references your env name plus the standard
train / optimizer / gradient knobs.
Then one line in `scripts/train.py`'s `_register_builtins()` makes it
discoverable.
---
## Step 1 — Create the package
```bash
mkdir -p skillopt/envs/my_benchmark
touch skillopt/envs/my_benchmark/__init__.py
mkdir -p skillopt/envs/docfaithful
touch skillopt/envs/docfaithful/__init__.py
```
## Step 2: Implement the Data Loader
## Step 2 Implement the data loader
Create `skillopt/envs/my_benchmark/loader.py`:
`skillopt/envs/docfaithful/loader.py`:
```python
from skillopt.data.base import DataLoader, DataItem
from __future__ import annotations
class MyBenchmarkDataLoader(DataLoader):
"""Load and split your benchmark data."""
def __init__(self, data_dir: str, **kwargs):
super().__init__(**kwargs)
self.data_dir = data_dir
def setup(self, cfg: dict):
"""Initialize splits based on config."""
self.split_mode = cfg.get('split_mode', 'ratio')
# Load your data here
self.items = self._load_items()
self._create_splits(cfg)
def _load_items(self) -> list[DataItem]:
"""Load raw data into DataItem objects."""
items = []
# TODO: Load your data
for entry in your_data:
items.append(DataItem(
id=entry['id'],
input=entry['question'],
ground_truth=entry['answer'],
metadata=entry.get('metadata', {})
))
return items
def get_split_items(self, split: str) -> list[DataItem]:
"""Return items for a given split (train/valid/test)."""
return self.splits[split]
import json
from pathlib import Path
from skillopt.datasets.base import SplitDataLoader
def _normalize(raw: dict) -> dict:
"""Make sure every item has an ``id``. Other keys are env-specific."""
return {
"id": str(raw["uid"]),
"question": raw["question"],
"ground_truth": raw["answer"],
"reference_text": raw.get("reference", ""),
"task_type": raw.get("category", "docfaithful"),
}
class DocFaithfulDataLoader(SplitDataLoader):
"""Load DocFaithful items from JSON files inside each split dir."""
def load_split_items(self, split_path: str) -> list[dict]:
# split_path is e.g. data/docfaithful_split/train/
json_files = sorted(Path(split_path).glob("*.json"))
if not json_files:
raise FileNotFoundError(f"No .json file found in {split_path}")
with json_files[0].open(encoding="utf-8") as f:
raw = json.load(f)
return [_normalize(item) for item in raw]
```
## Step 3: Implement the Environment Adapter
Only `load_split_items()` is mandatory. If you also want to support
`split_mode="ratio"` (auto-split a single raw file into train/val/test),
override `load_raw_items(data_path)` as well — see
`skillopt/datasets/base.py` docstrings.
Create `skillopt/envs/my_benchmark/env.py`:
## Step 3 — Write the rollout helper
`skillopt/envs/docfaithful/rollout.py`:
```python
from skillopt.envs.base import EnvAdapter, TaskResult
from __future__ import annotations
class MyBenchmarkEnv(EnvAdapter):
"""Execute tasks and evaluate results."""
def __init__(self, cfg: dict):
super().__init__(cfg)
async def execute(self, item: DataItem, skill: str, model) -> TaskResult:
"""
Execute a single task.
Args:
item: The data item to process
skill: Current skill document content
model: The target model instance
Returns:
TaskResult with prediction, score, and trajectory
"""
# Build prompt with skill document
prompt = self.build_prompt(item, skill)
# Get model response
response = await model.generate(prompt)
# Extract prediction
prediction = self.parse_response(response)
# Score against ground truth
score = self.evaluate(prediction, item.ground_truth)
return TaskResult(
item_id=item.id,
prediction=prediction,
score=score,
trajectory=[
{"role": "system", "content": skill},
{"role": "user", "content": item.input},
{"role": "assistant", "content": response}
]
import json
import os
from pathlib import Path
from skillopt.model import chat_target
def _score(prediction: str, ground_truth: str) -> tuple[int, float]:
"""Trivial exact-match scorer. Replace with F1 / ROUGE / LLM-judge."""
p = (prediction or "").strip().lower()
g = (ground_truth or "").strip().lower()
hard = int(p == g and bool(g))
soft = 1.0 if hard else 0.0
return hard, soft
def _rollout_one(item: dict, skill_content: str,
*, max_completion_tokens: int) -> dict:
system = skill_content
user = (
f"Question: {item['question']}\n\n"
f"Reference:\n{item.get('reference_text', '')}\n\n"
"Answer:"
)
prediction, _usage = chat_target(
system=system,
user=user,
max_completion_tokens=max_completion_tokens,
)
hard, soft = _score(prediction, item.get("ground_truth", ""))
return {
"id": str(item["id"]),
"hard": hard,
"soft": soft,
"predicted_answer": prediction,
"question": item.get("question", ""),
"reference_text": item.get("reference_text", ""),
"task_type": item.get("task_type", "docfaithful"),
}
def run_batch(*, items: list[dict], skill_content: str, out_root: str,
workers: int = 4, max_completion_tokens: int = 4096) -> list[dict]:
"""Run a batch of episodes sequentially or with a thread pool."""
os.makedirs(out_root, exist_ok=True)
# For brevity we go sequentially — swap in concurrent.futures.ThreadPoolExecutor
# when network / model latency dominates.
results = [
_rollout_one(item, skill_content,
max_completion_tokens=max_completion_tokens)
for item in items
]
Path(out_root, "rollouts.json").write_text(
json.dumps(results, ensure_ascii=False, indent=2)
)
return results
```
Two design points worth flagging:
- **Scoring lives here, not in `EnvAdapter`.** There is no `evaluate()`
method on the ABC. Whatever signal you put in `hard` (0/1, or a float
in [0, 1] for smoothed reward) and `soft` (float in [0, 1]) is what
the optimizer reads.
- **Use `skillopt.model.chat_target`**, not raw OpenAI/Claude calls.
That routes through whichever **chat** target backend the user
configured (`openai_chat` / `claude_chat` / `qwen_chat` /
`minimax_chat`) without your adapter caring. Exec-style backends
(`codex_exec`, `claude_code_exec`) need env-specific rollout code —
see `skillopt/envs/swebench/` for an example.
## Step 4 — Implement the environment adapter
`skillopt/envs/docfaithful/adapter.py`:
```python
from __future__ import annotations
import os
from skillopt.datasets.base import BatchSpec
from skillopt.envs.base import EnvAdapter
from skillopt.envs.docfaithful.loader import DocFaithfulDataLoader
from skillopt.envs.docfaithful.rollout import run_batch
from skillopt.gradient.reflect import run_minibatch_reflect
class DocFaithfulAdapter(EnvAdapter):
"""SkillOpt adapter for the DocFaithful benchmark."""
def __init__(
self,
split_dir: str = "",
data_path: str = "",
split_mode: str = "split_dir",
split_ratio: str = "2:1:7",
split_seed: int = 42,
split_output_dir: str = "",
workers: int = 4,
analyst_workers: int = 4,
failure_only: bool = False,
minibatch_size: int = 8,
edit_budget: int = 4,
seed: int = 42,
limit: int = 0,
max_completion_tokens: int = 4096,
) -> None:
self.workers = workers
self.analyst_workers = analyst_workers
self.failure_only = failure_only
self.minibatch_size = minibatch_size
self.edit_budget = edit_budget
self.max_completion_tokens = int(max_completion_tokens)
self.dataloader = DocFaithfulDataLoader(
split_dir=split_dir,
data_path=data_path,
split_mode=split_mode,
split_ratio=split_ratio,
split_seed=split_seed,
split_output_dir=split_output_dir,
seed=seed,
limit=limit,
)
def evaluate(self, prediction: str, ground_truth: str) -> float:
"""
Score a prediction against ground truth.
Returns:
Float between 0.0 and 1.0
"""
# TODO: Implement your scoring logic
# Examples: exact match, F1, ANLS, etc.
return float(prediction.strip() == ground_truth.strip())
def build_prompt(self, item, skill: str) -> str:
"""Combine skill document with task input."""
return f"{skill}\n\n---\n\nQuestion: {item.input}"
def parse_response(self, response: str) -> str:
"""Extract the answer from model response."""
return response.strip()
# ── Lifecycle ───────────────────────────────────────────────────────
def setup(self, cfg: dict) -> None:
super().setup(cfg)
self.dataloader.setup(cfg)
def get_dataloader(self):
return self.dataloader
# ── Env construction ────────────────────────────────────────────────
def build_env_from_batch(self, batch: BatchSpec, **kwargs):
# For dataset-backed envs the "manager" is just the items list.
return list(batch.payload or [])
def build_train_env(self, batch_size: int, seed: int, **kwargs):
batch = self.dataloader.build_train_batch(
batch_size=batch_size, seed=seed, **kwargs
)
return self.build_env_from_batch(batch, **kwargs)
def build_eval_env(self, env_num: int, split: str, seed: int, **kwargs):
batch = self.dataloader.build_eval_batch(
env_num=env_num, split=split, seed=seed, **kwargs
)
return self.build_env_from_batch(batch, **kwargs)
# ── The two real action methods ─────────────────────────────────────
def rollout(self, env_manager, skill_content: str,
out_dir: str, **kwargs) -> list[dict]:
items: list[dict] = env_manager
return run_batch(
items=items,
skill_content=skill_content,
out_root=out_dir,
workers=self.workers,
max_completion_tokens=self.max_completion_tokens,
)
def reflect(self, results: list[dict], skill_content: str,
out_dir: str, **kwargs) -> list[dict | None]:
return run_minibatch_reflect(
results=results,
skill_content=skill_content,
prediction_dir=kwargs.get(
"prediction_dir", os.path.join(out_dir, "predictions")
),
patches_dir=kwargs.get(
"patches_dir", os.path.join(out_dir, "patches")
),
workers=self.analyst_workers,
failure_only=self.failure_only,
minibatch_size=self.minibatch_size,
edit_budget=self.edit_budget,
random_seed=kwargs.get("random_seed"),
error_system=self.get_error_minibatch_prompt(),
success_system=self.get_success_minibatch_prompt(),
step_buffer_context=kwargs.get("step_buffer_context", ""),
update_mode=getattr(self, "_cfg", {}).get("skill_update_mode", "patch"),
)
def get_task_types(self) -> list[str]:
seen: list[str] = []
for item in (
self.dataloader.train_items
+ self.dataloader.val_items
+ self.dataloader.test_items
):
tt = str(item.get("task_type") or "docfaithful")
if tt not in seen:
seen.append(tt)
return seen or ["docfaithful"]
```
## Step 4: Register the Benchmark
### What the rollout actually does
Add to `skillopt/envs/__init__.py`:
Look back at `run_batch` from Step 3 — it sends each `item["question"]`
to the target model with `skill_content` as the system prompt, scores
the answer against `item["ground_truth"]`, and returns a list of dicts:
```python
from .my_benchmark.env import MyBenchmarkEnv
from .my_benchmark.loader import MyBenchmarkDataLoader
BENCHMARK_REGISTRY = {
# ... existing benchmarks ...
'my_benchmark': {
'env': MyBenchmarkEnv,
'loader': MyBenchmarkDataLoader,
},
}
[
{"id": "ex_001", "hard": 1, "soft": 0.92,
"predicted_answer": "...", "question": "...",
"reference_text": item["reference_text"]},
{"id": "ex_002", "hard": 0, "soft": 0.13, "fail_reason": "...", ...},
...
]
```
## Step 5: Create Config
The trainer only requires `id`, `hard`, `soft`. The rest is preserved on
`RolloutResult.extras` (see `skillopt/types.py`) and is what your
`reflect()` consumes via `run_minibatch_reflect`.
Create `configs/my_benchmark/default.yaml`:
## Step 5 — Register the adapter
Edit [`scripts/train.py`](https://github.com/microsoft/SkillOpt/blob/main/scripts/train.py)
and add to `_register_builtins()`:
```python
try:
from skillopt.envs.docfaithful.adapter import DocFaithfulAdapter
_ENV_REGISTRY["docfaithful"] = DocFaithfulAdapter
except ImportError:
pass # docfaithful deps not installed — skip
```
There is **no `BENCHMARK_REGISTRY` dict in `skillopt/envs/__init__.py`**
the registry lives in `scripts/train.py` and is populated lazily so that
optional deps don't break `--help`.
## Step 6 — Create the YAML config
`configs/docfaithful/default.yaml`:
```yaml
_base_: ['../_base_/default.yaml']
_base_: ../_base_/default.yaml # NOTE: string, not list
env:
name: my_benchmark
data_path: data/my_benchmark
split_mode: ratio
split_ratio: "2:1:7"
model:
reasoning_effort: medium
train:
batch_size: 16
accumulation: 1
num_epochs: 4
batch_size: 40
gradient:
minibatch_size: 8
merge_batch_size: 8
optimizer:
learning_rate: 4
lr_scheduler: cosine
use_slow_update: true
use_meta_skill: true
gradient:
analyst_workers: 16
env:
name: docfaithful
# Optional: a seed skill document. Create this file (or any markdown
# file) yourself before the first run, or omit the key to let SkillOpt
# start from an empty skill.
skill_init: skillopt/envs/docfaithful/skills/initial.md
split_mode: split_dir
split_dir: data/docfaithful_split
workers: 4
max_completion_tokens: 4096
limit: 0
```
## Step 6: Run
> ⚠️ `_base_` is currently parsed as a **string path**, not a list. Write
> `_base_: ../_base_/default.yaml`, not `_base_: ['../_base_/default.yaml']`.
> See [`skillopt/config.py`](https://github.com/microsoft/SkillOpt/blob/main/skillopt/config.py)
> if you want to add list-form inheritance.
## Step 7 — Run
```bash
python scripts/train.py --config configs/my_benchmark/default.yaml
# If you set skill_init above, create the seed skill first:
# mkdir -p skillopt/envs/docfaithful/skills
# echo "# DocFaithful initial skill" > skillopt/envs/docfaithful/skills/initial.md
python scripts/train.py --config configs/docfaithful/default.yaml
```
If you get `ValueError: Unknown environment 'docfaithful'. Available: [...]`,
you forgot Step 5.
If you get `TypeError: Can't instantiate abstract class DocFaithfulAdapter`,
you forgot to implement one of the five abstract methods on `EnvAdapter`:
`build_train_env`, `build_eval_env`, `rollout`, `reflect`,
`get_task_types`.
## Tips
!!! tip
- Use a small `batch_size` (10-20) for initial testing
- The `evaluate()` method is critical — a noisy metric will confuse the optimizer
- Start with `train.batch_size: 4` and `limit: 10` while debugging.
- The `evaluate` half lives **inside your `rollout`**, not as a separate
method — there is no `evaluate()` in the `EnvAdapter` ABC. Score the
prediction in `run_batch` and put the score on each result dict's
`hard` / `soft`.
- Noisy scoring kills the optimizer. Spend time on `run_batch`'s scoring
before you spend time on prompts.
- If your benchmark needs heavy optional deps (selenium, vllm, ...),
wrap the registration block with `try / except ImportError` (Step 5)
so people without those deps can still `--help`.
- Copy `skillopt/envs/_template/` as a starting skeleton — it now
implements the real abstract methods.

View File

@@ -1,81 +1,195 @@
# API Reference
This page documents the public Python API SkillOpt exposes for **extending the
framework** with new environments / benchmarks. For ready-made adapters,
browse [`skillopt/envs/`](https://github.com/microsoft/SkillOpt/tree/main/skillopt/envs).
> **Source of truth.** The classes below are real Python ABCs defined in
> `skillopt/envs/base.py`, `skillopt/datasets/base.py`, `skillopt/types.py`,
> and `skillopt/evaluation/gate.py`. If this page ever drifts, the code
> wins — please open an issue.
---
## Core Classes
### `EnvAdapter`
Abstract base class for benchmark environments.
`skillopt/envs/base.py` — abstract adapter that connects the SkillOpt
trainer to an environment (benchmark, simulator, REST API, ...).
Subclasses **must** implement the five abstract methods below.
```python
from abc import ABC, abstractmethod
from skillopt.datasets.base import BaseDataLoader, BatchSpec
class EnvAdapter(ABC):
async def execute(self, item, skill, model) -> TaskResult
def evaluate(self, prediction, ground_truth) -> float
def build_prompt(self, item, skill) -> str
# ── Lifecycle hooks (have defaults; override only if needed) ────────
def setup(self, cfg: dict) -> None: ...
def get_dataloader(self) -> BaseDataLoader | None: ...
def requires_ray(self) -> bool: ... # default False
# ── Abstract methods (subclasses MUST implement) ────────────────────
@abstractmethod
def build_train_env(self, batch_size: int, seed: int, **kwargs):
"""Return an environment-manager object to be passed to rollout()."""
@abstractmethod
def build_eval_env(self, env_num: int, split: str, seed: int, **kwargs):
"""Like build_train_env() but for a fixed eval split."""
@abstractmethod
def rollout(self, env_manager, skill_content: str,
out_dir: str, **kwargs) -> list[dict]:
"""Run a batch of episodes with the current skill.
Each returned dict MUST contain:
- "id": str episode/task identifier
- "hard": int (0|1) pass/fail (may be float 0.0-1.0 if smoothed)
- "soft": float partial-credit score in [0.0, 1.0]
It MAY contain env-specific extra keys (parsed into RolloutResult.extras).
"""
@abstractmethod
def reflect(self, results: list[dict], skill_content: str,
out_dir: str, **kwargs) -> list[dict | None]:
"""Turn rollout results into a list of raw patch dicts.
Each dict (or None to drop the slot) MUST contain:
- "patch": {"edits": [...]} a Patch.to_dict() payload
- "source_type": "failure" | "success"
"""
@abstractmethod
def get_task_types(self) -> list[str]:
"""Distinct task-type strings used for stratified sampling."""
```
### `DataLoader`
The trainer also calls a few default-implemented helpers on every adapter:
`build_reference_text`, `get_reference_metadata`, `attach_reference_context`,
`select_representative_items`, and `build_env_from_batch`. Read the docstrings
in `skillopt/envs/base.py` if you need to override any of these — most
benchmarks don't.
Abstract base class for data loading and splitting.
### `BaseDataLoader` / `SplitDataLoader`
`skillopt/datasets/base.py` — episode-planning loaders.
```python
class DataLoader(ABC):
def setup(self, cfg: dict) -> None
def get_split_items(self, split: str) -> list[DataItem]
class BaseDataLoader(ABC):
def setup(self, cfg: dict) -> None: ...
@abstractmethod
def build_train_batch(self, batch_size: int, seed: int, **kwargs) -> BatchSpec: ...
@abstractmethod
def build_eval_batch(self, env_num: int, split: str, seed: int, **kwargs) -> BatchSpec: ...
class SplitDataLoader(BaseDataLoader):
"""Concrete base for dataset-backed envs with on-disk train/val/test splits.
Subclasses only need to implement load_split_items() (and optionally
load_raw_items() if you also want ``split_mode='ratio'``).
"""
def load_split_items(self, split_path: str) -> list[dict]: ...
def load_raw_items(self, data_path: str) -> list[dict]: ... # optional
```
### `ModelBackend`
`SplitDataLoader` handles two layout modes:
Abstract base class for LLM backends.
| `split_mode` | What it expects |
|---|---|
| `"split_dir"` | A directory with `train/`, `val/`, `test/` subdirs already split. |
| `"ratio"` | A raw dataset path + `split_ratio: "2:1:7"` style string. |
In either case the items returned by `load_split_items()` are plain
`dict` objects with at minimum an `"id"` key.
### `BatchSpec`
`skillopt/datasets/base.py` — a slotted dataclass describing one batch
request the trainer hands to the adapter.
```python
class ModelBackend(ABC):
async def generate(self, messages, **kwargs) -> ModelResponse
async def generate_with_tools(self, messages, tools, **kwargs) -> ModelResponse
```
### `Trainer`
Main training loop orchestrator.
```python
class Trainer:
def __init__(self, cfg: dict)
async def train(self) -> TrainResult
async def evaluate(self, skill: str, split: str) -> EvalResult
```
## Data Classes
### `DataItem`
```python
@dataclass
class DataItem:
id: str
input: str
ground_truth: str
@dataclass(slots=True)
class BatchSpec:
phase: str # "train" | "eval"
split: str # "train" | "val" | "test" | "valid_seen" | ...
seed: int
batch_size: int
payload: object | None = None # what the loader produced (e.g. list[dict])
metadata: dict = field(default_factory=dict)
```
### `TaskResult`
### `Edit` / `Patch`
`skillopt/types.py` — the I/O types Reflect / Aggregate / Update produce
and consume.
```python
EditOp = Literal["append", "insert_after", "replace", "delete"]
@dataclass
class TaskResult:
item_id: str
prediction: str
score: float
trajectory: list[dict]
class Edit:
op: EditOp
content: str = ""
target: str = ""
support_count: int | None = None
source_type: Literal["failure", "success"] | None = None
merge_level: int | None = None
update_origin: str = ""
update_target: str = ""
@dataclass
class Patch:
edits: list[Edit] = field(default_factory=list)
reasoning: str = ""
ranking_details: dict[str, Any] | None = None
```
### `ModelResponse`
Both types support `to_dict()` / `from_dict()` for serialization.
```python
@dataclass
class ModelResponse:
content: str
usage: dict
model: str
```
### `RolloutResult`
For detailed source code, see the [`skillopt/`](https://github.com/microsoft/SkillOpt/tree/main/skillopt) directory.
`skillopt/types.py` — the normalised rollout return type. The trainer
calls `RolloutResult.from_dict(...)` on each dict returned from
`EnvAdapter.rollout()`, so the only **hard** requirement on those dicts is
the three keys above (`id`, `hard`, `soft`). Extra fields are preserved
into `RolloutResult.extras`.
### `GateResult` / `GateAction`
`skillopt/evaluation/gate.py` — the validation-gate decision types
returned each epoch.
---
## Registering an environment
Environments are not registered via decorators or a `BENCHMARK_REGISTRY`
dict. The trainer keeps a lazy registry inside `scripts/train.py`
`_ENV_REGISTRY` — populated by `_register_builtins()`. To add a new env
you append a `try / except ImportError` block there. See
[Add a New Benchmark](../guide/new-benchmark.md) for the full step-by-step.
---
## Backends (model layer)
The model layer lives under `skillopt.model.*`. Backends are selected
via `model.optimizer_backend` and `model.target_backend` in the config —
not via a base class subclass. Supported values (as of this writing):
| Backend | Optimizer? | Target? |
|---|---|---|
| `openai_chat` | ✓ | ✓ |
| `claude_chat` | ✓ | ✓ |
| `qwen_chat` | ✓ | ✓ |
| `minimax_chat` | ✓ | ✓ |
| `codex_exec` | — | ✓ |
| `claude_code_exec` | — | ✓ |
See `skillopt/model/backend_config.py` for the live whitelist and
[`docs/reference/config.md`](./config.md) for the per-backend
configuration keys.

View File

@@ -4,16 +4,40 @@ This directory provides scaffold files for adding a new benchmark to SkillOpt.
## Files
- `env_template.py` — Environment adapter template
- `loader_template.py` — Data loader template
- `config_template.yaml` — Config file template
- `env_template.py` — Environment adapter template (subclasses
`EnvAdapter`; implements the 5 abstract methods so the file is
instantiable out of the box).
- `loader_template.py` — Data loader template (subclasses
`SplitDataLoader`; implements `load_split_items` for `.json`/`.jsonl`).
- `config_template.yaml` — Config file template.
## Usage
1. Copy this directory: `cp -r skillopt/envs/_template skillopt/envs/your_benchmark`
2. Rename files: remove `_template` suffix
3. Implement the `TODO` sections
4. Register in `skillopt/envs/__init__.py`
5. Create config at `configs/your_benchmark/default.yaml`
1. **Copy the directory:**
```bash
cp -r skillopt/envs/_template skillopt/envs/your_benchmark
```
2. **Rename the files** (drop the `_template` suffix):
```bash
cd skillopt/envs/your_benchmark
mv env_template.py adapter.py
mv loader_template.py loader.py
```
…and inside each file rename the classes
(`TemplateBenchmarkEnv → YourBenchmarkAdapter`,
`TemplateBenchmarkLoader → YourBenchmarkLoader`)
and fix the cross-import in `adapter.py`.
3. **Implement the TODO blocks** inside `adapter.py:rollout` and the
`_normalize_item` helper in `loader.py`. If you want real reflection,
uncomment the `run_minibatch_reflect` block in `adapter.py:reflect`.
4. **Register** the adapter — add a `try / except ImportError` block in
`scripts/train.py`'s `_register_builtins()` mapping the registry key
to your `YourBenchmarkAdapter` class. There is no
`BENCHMARK_REGISTRY` dict in `skillopt/envs/__init__.py`; the live
registry is `_ENV_REGISTRY` in `scripts/train.py`.
5. **Create the config** at `configs/your_benchmark/default.yaml`
(start from `config_template.yaml`). `_base_` is a **string path**,
not a list.
See the [documentation](../../docs/guide/new-benchmark.md) for the full guide.
See the [Add a New Benchmark guide](../../../docs/guide/new-benchmark.md)
for the full step-by-step with a worked `docfaithful` example.

View File

@@ -4,27 +4,36 @@
# Copy this file to configs/<your_benchmark>/default.yaml
# and customize the values below.
# Inherit global defaults
_base_: ['../_base_/default.yaml']
# Inherit global defaults.
# NOTE: `_base_` is a string path, not a list.
_base_: ../_base_/default.yaml
# ── Environment ──────────────────────────────────
env:
name: your_benchmark # Must match registry key
data_path: data/your_benchmark # Path to your data
name: your_benchmark # Must match the key registered in scripts/train.py
# Optional: a seed skill document. Create this file yourself before the
# first run, or omit the key to start from an empty skill.
# skill_init: skillopt/envs/your_benchmark/skills/initial.md
data_path: data/your_benchmark # Path to your data (for split_mode: ratio)
split_dir: "" # Set this and use split_mode: split_dir for pre-split data
split_mode: ratio # "ratio" or "split_dir"
split_ratio: "2:1:7" # train:val:test
exec_timeout: 120 # Per-task timeout (seconds)
split_ratio: "2:1:7" # train:val:test (used when split_mode: ratio)
workers: 4 # Parallel rollout workers
max_completion_tokens: 4096 # Cap per target-model call
limit: 0 # 0 = no limit; small int = debug sample
# ── Training ─────────────────────────────────────
train:
num_epochs: 4 # Number of epochs
batch_size: 40 # Tasks per step (batch size)
num_epochs: 4
batch_size: 40
accumulation: 1
seed: 42
# ── Gradient (Reflection) ───────────────────────
gradient:
analyst_workers: 16 # Parallel reflection workers
minibatch_size: 8
merge_batch_size: 8
# ── Optimizer ────────────────────────────────────
optimizer:
@@ -39,7 +48,8 @@ evaluation:
eval_test: true # Run test eval after training
# ── Model ────────────────────────────────────────
# Override only what differs from the inherited defaults.
model:
backend: azure_openai # azure_openai | openai_chat | claude_code_exec | qwen
optimizer: gpt-4o
target: gpt-4o
optimizer_backend: openai_chat # openai_chat | claude_chat | qwen_chat | minimax_chat
target_backend: openai_chat # … plus codex_exec / claude_code_exec for target only
reasoning_effort: medium

View File

@@ -4,89 +4,193 @@ Benchmark Environment Template
Copy this file and implement the TODO sections to add a new benchmark.
The EnvAdapter is responsible for:
1. Executing tasks using the target model + current skill document
2. Evaluating predictions against ground truth
3. Returning structured results for the training loop
1. Building per-batch environment managers (train and eval splits).
2. Running rollouts under the current skill document.
3. Reflecting on those rollouts into raw patch dicts.
4. Reporting the distinct task types in your data (for stratified
sampling).
For a fully worked example see ``skillopt/envs/officeqa/``.
"""
from __future__ import annotations
import os
from skillopt.datasets.base import BatchSpec
from skillopt.envs.base import EnvAdapter
from skillopt.envs._template.loader_template import TemplateBenchmarkLoader
# When you wire in real reflection, also import:
# from skillopt.gradient.reflect import run_minibatch_reflect
class TemplateBenchmarkEnv(EnvAdapter):
"""
Environment adapter for <Your Benchmark Name>.
Rename this class and implement the abstract methods below.
Rename this class. Each abstract method below is required by
:class:`skillopt.envs.base.EnvAdapter`. The template implementations
are minimal so this file is importable and instantiable; replace the
TODOs with real logic.
"""
def __init__(self, cfg: dict):
super().__init__(cfg)
# TODO: Initialize benchmark-specific state
# Example: self.tools = load_tools(cfg)
def __init__(
self,
split_dir: str = "",
data_path: str = "",
split_mode: str = "split_dir",
split_ratio: str = "2:1:7",
split_seed: int = 42,
split_output_dir: str = "",
workers: int = 4,
analyst_workers: int = 4,
failure_only: bool = False,
minibatch_size: int = 8,
edit_budget: int = 4,
seed: int = 42,
limit: int = 0,
max_completion_tokens: int = 4096,
) -> None:
self.workers = workers
self.analyst_workers = analyst_workers
self.failure_only = failure_only
self.minibatch_size = minibatch_size
self.edit_budget = edit_budget
self.max_completion_tokens = int(max_completion_tokens)
self.dataloader = TemplateBenchmarkLoader(
split_dir=split_dir,
data_path=data_path,
split_mode=split_mode,
split_ratio=split_ratio,
split_seed=split_seed,
split_output_dir=split_output_dir,
seed=seed,
limit=limit,
)
async def execute(self, item, skill: str, model):
# ── Lifecycle hooks ────────────────────────────────────────────────
def setup(self, cfg: dict) -> None:
super().setup(cfg)
self.dataloader.setup(cfg)
def get_dataloader(self):
return self.dataloader
# ── Batch → env manager ────────────────────────────────────────────
def build_env_from_batch(self, batch: BatchSpec, **kwargs):
# Dataset-backed envs typically just pass items straight through.
return list(batch.payload or [])
def build_train_env(self, batch_size: int, seed: int, **kwargs):
batch = self.dataloader.build_train_batch(
batch_size=batch_size, seed=seed, **kwargs
)
return self.build_env_from_batch(batch, **kwargs)
def build_eval_env(self, env_num: int, split: str, seed: int, **kwargs):
batch = self.dataloader.build_eval_batch(
env_num=env_num, split=split, seed=seed, **kwargs
)
return self.build_env_from_batch(batch, **kwargs)
# ── Rollout: run episodes under current skill ──────────────────────
def rollout(
self,
env_manager,
skill_content: str,
out_dir: str,
**kwargs,
) -> list[dict]:
"""
Execute a single task with the target model.
Run a batch of episodes under the current skill.
Args:
item: DataItem with .id, .input, .ground_truth, .metadata
skill: Current skill document content (Markdown string)
model: Target model backend instance
Returns:
TaskResult with prediction, score, and trajectory
TODO: replace this loop with your real rollout. For each item:
1. Build the prompt using `skill_content` as the system message.
2. Call your target model.
3. Score the prediction.
4. Return a dict with at minimum: ``id`` (str), ``hard`` (0|1),
``soft`` (float in [0, 1]). Add any env-specific extras you
need for reflect() — they will be preserved on
``RolloutResult.extras``.
"""
# Step 1: Build the prompt combining skill + task input
prompt = self.build_prompt(item, skill)
items: list[dict] = env_manager
results: list[dict] = []
for item in items:
# ── REPLACE THIS BLOCK WITH YOUR REAL ROLLOUT ──
results.append(
{
"id": str(item.get("id", "")),
"hard": 0,
"soft": 0.0,
"predicted_answer": "",
"question": item.get("question", ""),
"fail_reason": "template rollout — not implemented",
}
)
return results
# Step 2: Call the target model
# TODO: Customize the message format for your benchmark
messages = [
{"role": "system", "content": skill},
{"role": "user", "content": item.input},
]
response = await model.generate(messages)
# ── Reflect: turn rollout results into patch dicts ─────────────────
# Step 3: Parse the model response into a prediction
prediction = self.parse_response(response.content)
# Step 4: Score the prediction
score = self.evaluate(prediction, item.ground_truth)
# Step 5: Return structured result
return {
"item_id": item.id,
"prediction": prediction,
"score": score,
"trajectory": messages + [{"role": "assistant", "content": response.content}],
}
def evaluate(self, prediction: str, ground_truth: str) -> float:
def reflect(
self,
results: list[dict],
skill_content: str,
out_dir: str,
**kwargs,
) -> list[dict | None]:
"""
Score a prediction against the ground truth.
Turn rollouts into a list of raw patch dicts (or None to drop).
Returns:
Float between 0.0 (wrong) and 1.0 (correct)
TODO: Implement your scoring metric. Common options:
- Exact match: float(pred.strip().lower() == gt.strip().lower())
- F1 score: compute token overlap
- ANLS: for document QA tasks
- Custom: any float in [0, 1]
"""
# Placeholder — exact match
return float(prediction.strip().lower() == ground_truth.strip().lower())
Each non-None dict MUST have:
- "patch": {"edits": [...]} a Patch.to_dict() payload
- "source_type": "failure" | "success"
def build_prompt(self, item, skill: str) -> str:
"""Combine skill document with task input."""
return f"{skill}\n\n---\n\nQuestion: {item.input}"
Most benchmarks delegate to
:func:`skillopt.gradient.reflect.run_minibatch_reflect` which
will call the optimizer model with the
``analyst_error_*`` / ``analyst_success_*`` prompts. To enable it,
uncomment the import above and call:
def parse_response(self, response: str) -> str:
from skillopt.gradient.reflect import run_minibatch_reflect
return run_minibatch_reflect(
results=results,
skill_content=skill_content,
prediction_dir=kwargs.get(
"prediction_dir", os.path.join(out_dir, "predictions")
),
patches_dir=kwargs.get(
"patches_dir", os.path.join(out_dir, "patches")
),
workers=self.analyst_workers,
failure_only=self.failure_only,
minibatch_size=self.minibatch_size,
edit_budget=self.edit_budget,
random_seed=kwargs.get("random_seed"),
error_system=self.get_error_minibatch_prompt(),
success_system=self.get_success_minibatch_prompt(),
step_buffer_context=kwargs.get("step_buffer_context", ""),
update_mode=getattr(self, "_cfg", {}).get(
"skill_update_mode", "patch"
),
)
"""
Extract the answer from the model's raw response.
TODO: Implement extraction logic. For example:
- Extract text after "Answer:"
- Parse JSON output
- Extract from code blocks
"""
return response.strip()
# Template default: produce no patches (no-op trainer step).
return [None for _ in results]
# ── Stratification hint ────────────────────────────────────────────
def get_task_types(self) -> list[str]:
"""Distinct task-type strings used for stratified sampling."""
seen: list[str] = []
all_items = (
self.dataloader.train_items
+ self.dataloader.val_items
+ self.dataloader.test_items
)
for item in all_items:
tt = str(item.get("task_type") or "template")
if tt not in seen:
seen.append(tt)
return seen or ["template"]

View File

@@ -1,103 +1,87 @@
"""
Benchmark Data Loader Template
================================
Copy this file and implement the TODO sections to load your benchmark data.
Copy this file and implement ``load_split_items`` to load your benchmark
data. The loader is a :class:`skillopt.datasets.base.SplitDataLoader`
subclass — the base class handles both ``split_mode="split_dir"`` (read
an existing train/val/test layout) and ``split_mode="ratio"`` (build the
splits from a single raw file deterministically).
The DataLoader is responsible for:
1. Loading raw data from disk
2. Splitting into train / validation / test sets
3. Providing DataItem objects to the training loop
For a fully worked example see
``skillopt/envs/officeqa/dataloader.py``.
"""
from __future__ import annotations
import json
from pathlib import Path
from skillopt.datasets.base import SplitDataLoader
class TemplateBenchmarkLoader:
def _normalize_item(raw: dict) -> dict:
"""
Normalise one raw entry into the dict shape SkillOpt expects.
The only **hard** requirement is ``"id"`` (str). Add whatever extra
fields your :class:`TemplateBenchmarkEnv.rollout` needs.
"""
return {
"id": str(raw.get("uid") or raw.get("id") or ""),
"question": str(raw.get("question") or raw.get("prompt") or ""),
"ground_truth": str(raw.get("ground_truth") or raw.get("answer") or ""),
"task_type": str(raw.get("category") or raw.get("task_type") or "template"),
# ── add benchmark-specific keys here ──
}
class TemplateBenchmarkLoader(SplitDataLoader):
"""
Data loader for <Your Benchmark Name>.
Rename this class and implement the methods below.
Subclass note: you usually only need to implement
:meth:`load_split_items`. The base class drives ``setup(cfg)``,
materialises ratio-mode splits, exposes ``train_items``,
``val_items``, ``test_items``, and builds ``BatchSpec`` objects on
demand.
If you want to support ``split_mode="ratio"`` (auto-split a single
file into train/val/test), also implement
:meth:`load_raw_items(data_path)` returning the full list of items.
"""
def __init__(self, data_dir: str = "data/your_benchmark", **kwargs):
self.data_dir = Path(data_dir)
self.items = []
self.splits = {}
def load_split_items(self, split_path: str) -> list[dict]:
"""Load all items for one split directory.
def setup(self, cfg: dict):
``split_path`` is e.g. ``data/your_benchmark/train/``. Return a
list of dicts, each shaped like :func:`_normalize_item`'s output.
"""
Initialize the loader with config.
Called once before training starts.
Args:
cfg: Dict with keys like 'split_mode', 'train_ratio', 'val_ratio', etc.
"""
# Step 1: Load raw data
self.items = self._load_items()
path = Path(split_path)
# Step 2: Create splits
split_mode = cfg.get("split_mode", "ratio")
if split_mode == "ratio":
self._split_by_ratio(
train_ratio=cfg.get("train_ratio", 0.7),
val_ratio=cfg.get("val_ratio", 0.15),
)
elif split_mode == "split_dir":
self._load_predefined_splits(cfg.get("split_dir", self.data_dir))
json_files = sorted(path.glob("*.json"))
if json_files:
with json_files[0].open(encoding="utf-8") as f:
payload = json.load(f)
if not isinstance(payload, list):
raise ValueError(
f"Expected JSON array at top level of {json_files[0]}"
)
return [_normalize_item(row) for row in payload]
def _load_items(self) -> list:
"""
Load raw data into structured items.
TODO: Implement data loading. Each item should have at minimum:
- id: unique identifier
- input: the task input (question, instruction, etc.)
- ground_truth: the expected answer
- metadata: optional dict with extra info
Example:
items = []
for path in self.data_dir.glob("*.json"):
data = json.loads(path.read_text())
for entry in data:
items.append({
"id": entry["id"],
"input": entry["question"],
"ground_truth": entry["answer"],
"metadata": {"source": path.name},
})
jsonl_files = sorted(path.glob("*.jsonl"))
if jsonl_files:
items: list[dict] = []
with jsonl_files[0].open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
items.append(_normalize_item(json.loads(line)))
return items
"""
raise NotImplementedError("Implement _load_items() for your benchmark")
def _split_by_ratio(self, train_ratio: float, val_ratio: float):
"""Split items by ratio."""
import random
random.shuffle(self.items)
n = len(self.items)
n_train = int(n * train_ratio)
n_val = int(n * val_ratio)
self.splits = {
"train": self.items[:n_train],
"valid": self.items[n_train:n_train + n_val],
"test": self.items[n_train + n_val:],
}
raise FileNotFoundError(
f"No .json or .jsonl file found in {split_path}"
)
def _load_predefined_splits(self, split_dir):
"""Load from pre-split directories."""
# TODO: Implement if your benchmark has pre-defined splits
raise NotImplementedError
def get_split_items(self, split: str) -> list:
"""
Return items for a given split.
Args:
split: One of "train", "valid", "test"
Returns:
List of data items for the requested split
"""
if split not in self.splits:
raise ValueError(f"Unknown split '{split}'. Available: {list(self.splits.keys())}")
return self.splits[split]
# Optional — only needed if you intend to use ``split_mode='ratio'``.
# def load_raw_items(self, data_path: str) -> list[dict]:
# ...