envs/_template: make template instantiable against real EnvAdapter ABC

The shipped env_template.py and loader_template.py described the same
fictional async execute / evaluate / build_prompt API documented in
docs/reference/api.md. As a result TemplateBenchmarkEnv(cfg) raised
'TypeError: Can't instantiate abstract class' for every copy-and-paste
user who followed the in-tree scaffold.

Rewrite the template so it's a working starting point:

- env_template.py: TemplateBenchmarkEnv(EnvAdapter) now implements all
  five real abstract methods (build_train_env, build_eval_env, rollout,
  reflect, get_task_types) with no-op defaults documented as TODO.
  Instantiable today; pytest 60/60 still passes.
- loader_template.py: TemplateBenchmarkLoader(SplitDataLoader)
  implements load_split_items for .json / .jsonl input and explains the
  optional load_raw_items override for split_mode="ratio".
- README.md: usage steps now point at scripts/train.py's _ENV_REGISTRY
  (the real registry) instead of a non-existent BENCHMARK_REGISTRY in
  skillopt/envs/__init__.py, and link to the rewritten new-benchmark
  guide.
- config_template.yaml: _base_ is a string path (not a list, which the
  loader rejects); skill_init is commented out with a note so the
  template config doesn't reference a file the user hasn't created.

Verified locally: 'from skillopt.envs._template.env_template import
TemplateBenchmarkEnv; TemplateBenchmarkEnv()' succeeds. Refs
microsoft/SkillOpt#30.

Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
This commit is contained in:
Yifan Yang
2026-06-01 20:15:12 +00:00
parent 2ca2910649
commit 4eb4c64b2a
4 changed files with 292 additions and 170 deletions

View File

@@ -4,16 +4,40 @@ This directory provides scaffold files for adding a new benchmark to SkillOpt.
## Files
- `env_template.py` — Environment adapter template
- `loader_template.py` — Data loader template
- `config_template.yaml` — Config file template
- `env_template.py` — Environment adapter template (subclasses
`EnvAdapter`; implements the 5 abstract methods so the file is
instantiable out of the box).
- `loader_template.py` — Data loader template (subclasses
`SplitDataLoader`; implements `load_split_items` for `.json`/`.jsonl`).
- `config_template.yaml` — Config file template.
## Usage
1. Copy this directory: `cp -r skillopt/envs/_template skillopt/envs/your_benchmark`
2. Rename files: remove `_template` suffix
3. Implement the `TODO` sections
4. Register in `skillopt/envs/__init__.py`
5. Create config at `configs/your_benchmark/default.yaml`
1. **Copy the directory:**
```bash
cp -r skillopt/envs/_template skillopt/envs/your_benchmark
```
2. **Rename the files** (drop the `_template` suffix):
```bash
cd skillopt/envs/your_benchmark
mv env_template.py adapter.py
mv loader_template.py loader.py
```
…and inside each file rename the classes
(`TemplateBenchmarkEnv → YourBenchmarkAdapter`,
`TemplateBenchmarkLoader → YourBenchmarkLoader`)
and fix the cross-import in `adapter.py`.
3. **Implement the TODO blocks** inside `adapter.py:rollout` and the
`_normalize_item` helper in `loader.py`. If you want real reflection,
uncomment the `run_minibatch_reflect` block in `adapter.py:reflect`.
4. **Register** the adapter — add a `try / except ImportError` block in
`scripts/train.py`'s `_register_builtins()` mapping the registry key
to your `YourBenchmarkAdapter` class. There is no
`BENCHMARK_REGISTRY` dict in `skillopt/envs/__init__.py`; the live
registry is `_ENV_REGISTRY` in `scripts/train.py`.
5. **Create the config** at `configs/your_benchmark/default.yaml`
(start from `config_template.yaml`). `_base_` is a **string path**,
not a list.
See the [documentation](../../docs/guide/new-benchmark.md) for the full guide.
See the [Add a New Benchmark guide](../../../docs/guide/new-benchmark.md)
for the full step-by-step with a worked `docfaithful` example.

View File

@@ -4,27 +4,36 @@
# Copy this file to configs/<your_benchmark>/default.yaml
# and customize the values below.
# Inherit global defaults
_base_: ['../_base_/default.yaml']
# Inherit global defaults.
# NOTE: `_base_` is a string path, not a list.
_base_: ../_base_/default.yaml
# ── Environment ──────────────────────────────────
env:
name: your_benchmark # Must match registry key
data_path: data/your_benchmark # Path to your data
name: your_benchmark # Must match the key registered in scripts/train.py
# Optional: a seed skill document. Create this file yourself before the
# first run, or omit the key to start from an empty skill.
# skill_init: skillopt/envs/your_benchmark/skills/initial.md
data_path: data/your_benchmark # Path to your data (for split_mode: ratio)
split_dir: "" # Set this and use split_mode: split_dir for pre-split data
split_mode: ratio # "ratio" or "split_dir"
split_ratio: "2:1:7" # train:val:test
exec_timeout: 120 # Per-task timeout (seconds)
split_ratio: "2:1:7" # train:val:test (used when split_mode: ratio)
workers: 4 # Parallel rollout workers
max_completion_tokens: 4096 # Cap per target-model call
limit: 0 # 0 = no limit; small int = debug sample
# ── Training ─────────────────────────────────────
train:
num_epochs: 4 # Number of epochs
batch_size: 40 # Tasks per step (batch size)
num_epochs: 4
batch_size: 40
accumulation: 1
seed: 42
# ── Gradient (Reflection) ───────────────────────
gradient:
analyst_workers: 16 # Parallel reflection workers
minibatch_size: 8
merge_batch_size: 8
# ── Optimizer ────────────────────────────────────
optimizer:
@@ -39,7 +48,8 @@ evaluation:
eval_test: true # Run test eval after training
# ── Model ────────────────────────────────────────
# Override only what differs from the inherited defaults.
model:
backend: azure_openai # azure_openai | openai_chat | claude_code_exec | qwen
optimizer: gpt-4o
target: gpt-4o
optimizer_backend: openai_chat # openai_chat | claude_chat | qwen_chat | minimax_chat
target_backend: openai_chat # … plus codex_exec / claude_code_exec for target only
reasoning_effort: medium

View File

@@ -4,89 +4,193 @@ Benchmark Environment Template
Copy this file and implement the TODO sections to add a new benchmark.
The EnvAdapter is responsible for:
1. Executing tasks using the target model + current skill document
2. Evaluating predictions against ground truth
3. Returning structured results for the training loop
1. Building per-batch environment managers (train and eval splits).
2. Running rollouts under the current skill document.
3. Reflecting on those rollouts into raw patch dicts.
4. Reporting the distinct task types in your data (for stratified
sampling).
For a fully worked example see ``skillopt/envs/officeqa/``.
"""
from __future__ import annotations
import os
from skillopt.datasets.base import BatchSpec
from skillopt.envs.base import EnvAdapter
from skillopt.envs._template.loader_template import TemplateBenchmarkLoader
# When you wire in real reflection, also import:
# from skillopt.gradient.reflect import run_minibatch_reflect
class TemplateBenchmarkEnv(EnvAdapter):
"""
Environment adapter for <Your Benchmark Name>.
Rename this class and implement the abstract methods below.
Rename this class. Each abstract method below is required by
:class:`skillopt.envs.base.EnvAdapter`. The template implementations
are minimal so this file is importable and instantiable; replace the
TODOs with real logic.
"""
def __init__(self, cfg: dict):
super().__init__(cfg)
# TODO: Initialize benchmark-specific state
# Example: self.tools = load_tools(cfg)
def __init__(
self,
split_dir: str = "",
data_path: str = "",
split_mode: str = "split_dir",
split_ratio: str = "2:1:7",
split_seed: int = 42,
split_output_dir: str = "",
workers: int = 4,
analyst_workers: int = 4,
failure_only: bool = False,
minibatch_size: int = 8,
edit_budget: int = 4,
seed: int = 42,
limit: int = 0,
max_completion_tokens: int = 4096,
) -> None:
self.workers = workers
self.analyst_workers = analyst_workers
self.failure_only = failure_only
self.minibatch_size = minibatch_size
self.edit_budget = edit_budget
self.max_completion_tokens = int(max_completion_tokens)
self.dataloader = TemplateBenchmarkLoader(
split_dir=split_dir,
data_path=data_path,
split_mode=split_mode,
split_ratio=split_ratio,
split_seed=split_seed,
split_output_dir=split_output_dir,
seed=seed,
limit=limit,
)
async def execute(self, item, skill: str, model):
# ── Lifecycle hooks ────────────────────────────────────────────────
def setup(self, cfg: dict) -> None:
super().setup(cfg)
self.dataloader.setup(cfg)
def get_dataloader(self):
return self.dataloader
# ── Batch → env manager ────────────────────────────────────────────
def build_env_from_batch(self, batch: BatchSpec, **kwargs):
# Dataset-backed envs typically just pass items straight through.
return list(batch.payload or [])
def build_train_env(self, batch_size: int, seed: int, **kwargs):
batch = self.dataloader.build_train_batch(
batch_size=batch_size, seed=seed, **kwargs
)
return self.build_env_from_batch(batch, **kwargs)
def build_eval_env(self, env_num: int, split: str, seed: int, **kwargs):
batch = self.dataloader.build_eval_batch(
env_num=env_num, split=split, seed=seed, **kwargs
)
return self.build_env_from_batch(batch, **kwargs)
# ── Rollout: run episodes under current skill ──────────────────────
def rollout(
self,
env_manager,
skill_content: str,
out_dir: str,
**kwargs,
) -> list[dict]:
"""
Execute a single task with the target model.
Run a batch of episodes under the current skill.
Args:
item: DataItem with .id, .input, .ground_truth, .metadata
skill: Current skill document content (Markdown string)
model: Target model backend instance
Returns:
TaskResult with prediction, score, and trajectory
TODO: replace this loop with your real rollout. For each item:
1. Build the prompt using `skill_content` as the system message.
2. Call your target model.
3. Score the prediction.
4. Return a dict with at minimum: ``id`` (str), ``hard`` (0|1),
``soft`` (float in [0, 1]). Add any env-specific extras you
need for reflect() — they will be preserved on
``RolloutResult.extras``.
"""
# Step 1: Build the prompt combining skill + task input
prompt = self.build_prompt(item, skill)
items: list[dict] = env_manager
results: list[dict] = []
for item in items:
# ── REPLACE THIS BLOCK WITH YOUR REAL ROLLOUT ──
results.append(
{
"id": str(item.get("id", "")),
"hard": 0,
"soft": 0.0,
"predicted_answer": "",
"question": item.get("question", ""),
"fail_reason": "template rollout — not implemented",
}
)
return results
# Step 2: Call the target model
# TODO: Customize the message format for your benchmark
messages = [
{"role": "system", "content": skill},
{"role": "user", "content": item.input},
]
response = await model.generate(messages)
# ── Reflect: turn rollout results into patch dicts ─────────────────
# Step 3: Parse the model response into a prediction
prediction = self.parse_response(response.content)
# Step 4: Score the prediction
score = self.evaluate(prediction, item.ground_truth)
# Step 5: Return structured result
return {
"item_id": item.id,
"prediction": prediction,
"score": score,
"trajectory": messages + [{"role": "assistant", "content": response.content}],
}
def evaluate(self, prediction: str, ground_truth: str) -> float:
def reflect(
self,
results: list[dict],
skill_content: str,
out_dir: str,
**kwargs,
) -> list[dict | None]:
"""
Score a prediction against the ground truth.
Turn rollouts into a list of raw patch dicts (or None to drop).
Returns:
Float between 0.0 (wrong) and 1.0 (correct)
TODO: Implement your scoring metric. Common options:
- Exact match: float(pred.strip().lower() == gt.strip().lower())
- F1 score: compute token overlap
- ANLS: for document QA tasks
- Custom: any float in [0, 1]
"""
# Placeholder — exact match
return float(prediction.strip().lower() == ground_truth.strip().lower())
Each non-None dict MUST have:
- "patch": {"edits": [...]} a Patch.to_dict() payload
- "source_type": "failure" | "success"
def build_prompt(self, item, skill: str) -> str:
"""Combine skill document with task input."""
return f"{skill}\n\n---\n\nQuestion: {item.input}"
Most benchmarks delegate to
:func:`skillopt.gradient.reflect.run_minibatch_reflect` which
will call the optimizer model with the
``analyst_error_*`` / ``analyst_success_*`` prompts. To enable it,
uncomment the import above and call:
def parse_response(self, response: str) -> str:
from skillopt.gradient.reflect import run_minibatch_reflect
return run_minibatch_reflect(
results=results,
skill_content=skill_content,
prediction_dir=kwargs.get(
"prediction_dir", os.path.join(out_dir, "predictions")
),
patches_dir=kwargs.get(
"patches_dir", os.path.join(out_dir, "patches")
),
workers=self.analyst_workers,
failure_only=self.failure_only,
minibatch_size=self.minibatch_size,
edit_budget=self.edit_budget,
random_seed=kwargs.get("random_seed"),
error_system=self.get_error_minibatch_prompt(),
success_system=self.get_success_minibatch_prompt(),
step_buffer_context=kwargs.get("step_buffer_context", ""),
update_mode=getattr(self, "_cfg", {}).get(
"skill_update_mode", "patch"
),
)
"""
Extract the answer from the model's raw response.
TODO: Implement extraction logic. For example:
- Extract text after "Answer:"
- Parse JSON output
- Extract from code blocks
"""
return response.strip()
# Template default: produce no patches (no-op trainer step).
return [None for _ in results]
# ── Stratification hint ────────────────────────────────────────────
def get_task_types(self) -> list[str]:
"""Distinct task-type strings used for stratified sampling."""
seen: list[str] = []
all_items = (
self.dataloader.train_items
+ self.dataloader.val_items
+ self.dataloader.test_items
)
for item in all_items:
tt = str(item.get("task_type") or "template")
if tt not in seen:
seen.append(tt)
return seen or ["template"]

View File

@@ -1,103 +1,87 @@
"""
Benchmark Data Loader Template
================================
Copy this file and implement the TODO sections to load your benchmark data.
Copy this file and implement ``load_split_items`` to load your benchmark
data. The loader is a :class:`skillopt.datasets.base.SplitDataLoader`
subclass — the base class handles both ``split_mode="split_dir"`` (read
an existing train/val/test layout) and ``split_mode="ratio"`` (build the
splits from a single raw file deterministically).
The DataLoader is responsible for:
1. Loading raw data from disk
2. Splitting into train / validation / test sets
3. Providing DataItem objects to the training loop
For a fully worked example see
``skillopt/envs/officeqa/dataloader.py``.
"""
from __future__ import annotations
import json
from pathlib import Path
from skillopt.datasets.base import SplitDataLoader
class TemplateBenchmarkLoader:
def _normalize_item(raw: dict) -> dict:
"""
Normalise one raw entry into the dict shape SkillOpt expects.
The only **hard** requirement is ``"id"`` (str). Add whatever extra
fields your :class:`TemplateBenchmarkEnv.rollout` needs.
"""
return {
"id": str(raw.get("uid") or raw.get("id") or ""),
"question": str(raw.get("question") or raw.get("prompt") or ""),
"ground_truth": str(raw.get("ground_truth") or raw.get("answer") or ""),
"task_type": str(raw.get("category") or raw.get("task_type") or "template"),
# ── add benchmark-specific keys here ──
}
class TemplateBenchmarkLoader(SplitDataLoader):
"""
Data loader for <Your Benchmark Name>.
Rename this class and implement the methods below.
Subclass note: you usually only need to implement
:meth:`load_split_items`. The base class drives ``setup(cfg)``,
materialises ratio-mode splits, exposes ``train_items``,
``val_items``, ``test_items``, and builds ``BatchSpec`` objects on
demand.
If you want to support ``split_mode="ratio"`` (auto-split a single
file into train/val/test), also implement
:meth:`load_raw_items(data_path)` returning the full list of items.
"""
def __init__(self, data_dir: str = "data/your_benchmark", **kwargs):
self.data_dir = Path(data_dir)
self.items = []
self.splits = {}
def load_split_items(self, split_path: str) -> list[dict]:
"""Load all items for one split directory.
def setup(self, cfg: dict):
``split_path`` is e.g. ``data/your_benchmark/train/``. Return a
list of dicts, each shaped like :func:`_normalize_item`'s output.
"""
Initialize the loader with config.
Called once before training starts.
Args:
cfg: Dict with keys like 'split_mode', 'train_ratio', 'val_ratio', etc.
"""
# Step 1: Load raw data
self.items = self._load_items()
path = Path(split_path)
# Step 2: Create splits
split_mode = cfg.get("split_mode", "ratio")
if split_mode == "ratio":
self._split_by_ratio(
train_ratio=cfg.get("train_ratio", 0.7),
val_ratio=cfg.get("val_ratio", 0.15),
)
elif split_mode == "split_dir":
self._load_predefined_splits(cfg.get("split_dir", self.data_dir))
json_files = sorted(path.glob("*.json"))
if json_files:
with json_files[0].open(encoding="utf-8") as f:
payload = json.load(f)
if not isinstance(payload, list):
raise ValueError(
f"Expected JSON array at top level of {json_files[0]}"
)
return [_normalize_item(row) for row in payload]
def _load_items(self) -> list:
"""
Load raw data into structured items.
TODO: Implement data loading. Each item should have at minimum:
- id: unique identifier
- input: the task input (question, instruction, etc.)
- ground_truth: the expected answer
- metadata: optional dict with extra info
Example:
items = []
for path in self.data_dir.glob("*.json"):
data = json.loads(path.read_text())
for entry in data:
items.append({
"id": entry["id"],
"input": entry["question"],
"ground_truth": entry["answer"],
"metadata": {"source": path.name},
})
jsonl_files = sorted(path.glob("*.jsonl"))
if jsonl_files:
items: list[dict] = []
with jsonl_files[0].open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
items.append(_normalize_item(json.loads(line)))
return items
"""
raise NotImplementedError("Implement _load_items() for your benchmark")
def _split_by_ratio(self, train_ratio: float, val_ratio: float):
"""Split items by ratio."""
import random
random.shuffle(self.items)
n = len(self.items)
n_train = int(n * train_ratio)
n_val = int(n * val_ratio)
self.splits = {
"train": self.items[:n_train],
"valid": self.items[n_train:n_train + n_val],
"test": self.items[n_train + n_val:],
}
raise FileNotFoundError(
f"No .json or .jsonl file found in {split_path}"
)
def _load_predefined_splits(self, split_dir):
"""Load from pre-split directories."""
# TODO: Implement if your benchmark has pre-defined splits
raise NotImplementedError
def get_split_items(self, split: str) -> list:
"""
Return items for a given split.
Args:
split: One of "train", "valid", "test"
Returns:
List of data items for the requested split
"""
if split not in self.splits:
raise ValueError(f"Unknown split '{split}'. Available: {list(self.splits.keys())}")
return self.splits[split]
# Optional — only needed if you intend to use ``split_mode='ratio'``.
# def load_raw_items(self, data_path: str) -> list[dict]:
# ...