docs: align benchmark guide and templates with real adapter API

This commit is contained in:
copilot-swe-agent[bot]
2026-06-01 19:38:17 +00:00
committed by GitHub
parent 36284e1bb0
commit b3c7d72364
8 changed files with 196 additions and 317 deletions

View File

@@ -25,10 +25,10 @@ Open an issue with:
See [Add a New Benchmark](guide/new-benchmark.md) for the implementation guide.
**Checklist:**
- [ ] Data loader in `skillopt/envs/<benchmark>/loader.py`
- [ ] Environment adapter in `skillopt/envs/<benchmark>/env.py`
- [ ] Data loader in `skillopt/envs/<benchmark>/dataloader.py`
- [ ] Environment adapter in `skillopt/envs/<benchmark>/adapter.py`
- [ ] Config file in `configs/<benchmark>/default.yaml`
- [ ] Registration in `skillopt/envs/__init__.py`
- [ ] Registration in `scripts/train.py` (`_ENV_REGISTRY`)
- [ ] Documentation page in `docs/`
### 🤖 New Model Backend

View File

@@ -6,9 +6,10 @@ Extend SkillOpt with your own benchmark in ~100 lines of code.
To add a benchmark, you need:
1. **Data Loader**Loads and splits your dataset
2. **Environment Adapter**Executes tasks and returns scores
1. **Data Loader**Subclass `SplitDataLoader` to load your split data
2. **Environment Adapter**Subclass `EnvAdapter` and implement rollout/reflect hooks
3. **Config** — YAML configuration file
4. **Registration** — Add your adapter to the train script registry
## Step 1: Create the Benchmark Package
@@ -19,126 +20,71 @@ touch skillopt/envs/my_benchmark/__init__.py
## Step 2: Implement the Data Loader
Create `skillopt/envs/my_benchmark/loader.py`:
Create `skillopt/envs/my_benchmark/dataloader.py`:
```python
from skillopt.data.base import DataLoader, DataItem
from skillopt.datasets.base import SplitDataLoader
class MyBenchmarkDataLoader(DataLoader):
"""Load and split your benchmark data."""
def __init__(self, data_dir: str, **kwargs):
super().__init__(**kwargs)
self.data_dir = data_dir
def setup(self, cfg: dict):
"""Initialize splits based on config."""
self.split_mode = cfg.get('split_mode', 'ratio')
# Load your data here
self.items = self._load_items()
self._create_splits(cfg)
def _load_items(self) -> list[DataItem]:
"""Load raw data into DataItem objects."""
items = []
# TODO: Load your data
for entry in your_data:
items.append(DataItem(
id=entry['id'],
input=entry['question'],
ground_truth=entry['answer'],
metadata=entry.get('metadata', {})
))
return items
def get_split_items(self, split: str) -> list[DataItem]:
"""Return items for a given split (train/valid/test)."""
return self.splits[split]
class MyBenchmarkDataLoader(SplitDataLoader):
"""Load benchmark items from raw data and/or split directories."""
def load_raw_items(self, data_path: str) -> list[dict]:
# For ratio mode, parse your source dataset from data_path.
# Return list[dict] where each item has at least a stable "id".
return super().load_raw_items(data_path)
def load_split_items(self, split_path: str) -> list[dict]:
# For split_dir mode, parse one split directory.
return super().load_split_items(split_path)
```
## Step 3: Implement the Environment Adapter
Create `skillopt/envs/my_benchmark/env.py`:
Create `skillopt/envs/my_benchmark/adapter.py`:
```python
from skillopt.envs.base import EnvAdapter, TaskResult
from skillopt.envs.base import EnvAdapter
from skillopt.envs.my_benchmark.dataloader import MyBenchmarkDataLoader
class MyBenchmarkEnv(EnvAdapter):
"""Execute tasks and evaluate results."""
def __init__(self, cfg: dict):
super().__init__(cfg)
async def execute(self, item: DataItem, skill: str, model) -> TaskResult:
"""
Execute a single task.
Args:
item: The data item to process
skill: Current skill document content
model: The target model instance
Returns:
TaskResult with prediction, score, and trajectory
"""
# Build prompt with skill document
prompt = self.build_prompt(item, skill)
# Get model response
response = await model.generate(prompt)
# Extract prediction
prediction = self.parse_response(response)
# Score against ground truth
score = self.evaluate(prediction, item.ground_truth)
return TaskResult(
item_id=item.id,
prediction=prediction,
score=score,
trajectory=[
{"role": "system", "content": skill},
{"role": "user", "content": item.input},
{"role": "assistant", "content": response}
]
)
def evaluate(self, prediction: str, ground_truth: str) -> float:
"""
Score a prediction against ground truth.
Returns:
Float between 0.0 and 1.0
"""
# TODO: Implement your scoring logic
# Examples: exact match, F1, ANLS, etc.
return float(prediction.strip() == ground_truth.strip())
def build_prompt(self, item, skill: str) -> str:
"""Combine skill document with task input."""
return f"{skill}\n\n---\n\nQuestion: {item.input}"
def parse_response(self, response: str) -> str:
"""Extract the answer from model response."""
return response.strip()
class MyBenchmarkAdapter(EnvAdapter):
def __init__(self, split_dir: str = "", data_path: str = "", **kwargs):
self.dataloader = MyBenchmarkDataLoader(split_dir=split_dir, data_path=data_path, **kwargs)
def setup(self, cfg: dict) -> None:
super().setup(cfg)
self.dataloader.setup(cfg)
def get_dataloader(self):
return self.dataloader
def build_train_env(self, batch_size: int, seed: int, **kwargs):
return self.dataloader.build_train_batch(batch_size=batch_size, seed=seed, **kwargs).payload
def build_eval_env(self, env_num: int, split: str, seed: int, **kwargs):
return self.dataloader.build_eval_batch(env_num=env_num, split=split, seed=seed, **kwargs).payload
def rollout(self, env_manager, skill_content: str, out_dir: str, **kwargs) -> list[dict]:
# Run target model on each item in env_manager and return list[dict].
# Required keys per row: "id", "hard" (0/1), "soft" (0.0-1.0)
raise NotImplementedError
def reflect(self, results: list[dict], skill_content: str, out_dir: str, **kwargs) -> list[dict | None]:
# Convert failure/success analysis into RawPatch-like dicts.
raise NotImplementedError
def get_task_types(self) -> list[str]:
return ["my_benchmark"]
```
## Step 4: Register the Benchmark
Add to `skillopt/envs/__init__.py`:
Add your adapter to `_register_builtins()` in `scripts/train.py`:
```python
from .my_benchmark.env import MyBenchmarkEnv
from .my_benchmark.loader import MyBenchmarkDataLoader
from skillopt.envs.my_benchmark.adapter import MyBenchmarkAdapter
BENCHMARK_REGISTRY = {
# ... existing benchmarks ...
'my_benchmark': {
'env': MyBenchmarkEnv,
'loader': MyBenchmarkDataLoader,
},
}
_ENV_REGISTRY["my_benchmark"] = MyBenchmarkAdapter
```
## Step 5: Create Config
@@ -146,7 +92,7 @@ BENCHMARK_REGISTRY = {
Create `configs/my_benchmark/default.yaml`:
```yaml
_base_: ['../_base_/default.yaml']
_base_: ../_base_/default.yaml
env:
name: my_benchmark
@@ -178,4 +124,5 @@ python scripts/train.py --config configs/my_benchmark/default.yaml
!!! tip
- Use a small `batch_size` (10-20) for initial testing
- The `evaluate()` method is critical — a noisy metric will confuse the optimizer
- Start from `skillopt/envs/_template/` and adapt from there
- Use an existing adapter (for example `skillopt/envs/officeqa/adapter.py`) as a concrete reference

View File

@@ -4,78 +4,72 @@
### `EnvAdapter`
Abstract base class for benchmark environments.
Abstract base class for benchmark environments (`skillopt/envs/base.py`).
```python
class EnvAdapter(ABC):
async def execute(self, item, skill, model) -> TaskResult
def evaluate(self, prediction, ground_truth) -> float
def build_prompt(self, item, skill) -> str
```
### `DataLoader`
Abstract base class for data loading and splitting.
```python
class DataLoader(ABC):
def setup(self, cfg: dict) -> None
def get_split_items(self, split: str) -> list[DataItem]
def get_dataloader(self) -> BaseDataLoader | None
def build_train_env(self, batch_size: int, seed: int, **kwargs)
def build_eval_env(self, env_num: int, split: str, seed: int, **kwargs)
def rollout(self, env_manager, skill_content: str, out_dir: str, **kwargs) -> list[dict]
def reflect(self, results: list[dict], skill_content: str, out_dir: str, **kwargs) -> list[dict | None]
def get_task_types(self) -> list[str]
```
### `ModelBackend`
Abstract base class for LLM backends.
The rollout contract expects result rows with at least:
```python
class ModelBackend(ABC):
async def generate(self, messages, **kwargs) -> ModelResponse
async def generate_with_tools(self, messages, tools, **kwargs) -> ModelResponse
{"id": str, "hard": int, "soft": float}
```
### `Trainer`
### `BaseDataLoader` / `SplitDataLoader`
Main training loop orchestrator.
Data loader abstractions (`skillopt/datasets/base.py`).
```python
class Trainer:
def __init__(self, cfg: dict)
async def train(self) -> TrainResult
async def evaluate(self, skill: str, split: str) -> EvalResult
class BaseDataLoader(ABC):
def setup(self, cfg: dict) -> None
def build_train_batch(self, batch_size: int, seed: int, **kwargs) -> BatchSpec
def build_eval_batch(self, env_num: int, split: str, seed: int, **kwargs) -> BatchSpec
class SplitDataLoader(BaseDataLoader):
def load_raw_items(self, data_path: str) -> list[dict]
def load_split_items(self, split_path: str) -> list[dict]
def get_split_items(self, split: str) -> list[dict]
```
## Data Classes
### `BatchSpec`
### `DataItem`
Represents one concrete batch request.
```python
@dataclass(slots=True)
class BatchSpec:
phase: str
split: str
seed: int
batch_size: int
payload: object | None = None
metadata: dict[str, Any] = field(default_factory=dict)
```
### `RolloutResult` / `RawPatch`
Typed helpers for stage I/O in `skillopt/types.py`.
```python
@dataclass
class DataItem:
class RolloutResult:
id: str
input: str
ground_truth: str
metadata: dict = field(default_factory=dict)
```
hard: int
soft: float
# optional benchmark-specific fields
### `TaskResult`
```python
@dataclass
class TaskResult:
item_id: str
prediction: str
score: float
trajectory: list[dict]
```
### `ModelResponse`
```python
@dataclass
class ModelResponse:
content: str
usage: dict
model: str
class RawPatch:
patch: Patch
source_type: Literal["failure", "success"] = "failure"
```
For detailed source code, see the [`skillopt/`](https://github.com/microsoft/SkillOpt/tree/main/skillopt) directory.

View File

@@ -13,7 +13,7 @@ This directory provides scaffold files for adding a new benchmark to SkillOpt.
1. Copy this directory: `cp -r skillopt/envs/_template skillopt/envs/your_benchmark`
2. Rename files: remove `_template` suffix
3. Implement the `TODO` sections
4. Register in `skillopt/envs/__init__.py`
4. Register your adapter in `_ENV_REGISTRY` inside `scripts/train.py`
5. Create config at `configs/your_benchmark/default.yaml`
See the [documentation](../../docs/guide/new-benchmark.md) for the full guide.

View File

@@ -5,11 +5,11 @@
# and customize the values below.
# Inherit global defaults
_base_: ['../_base_/default.yaml']
_base_: ../_base_/default.yaml
# ── Environment ──────────────────────────────────
env:
name: your_benchmark # Must match registry key
name: your_benchmark # Must match _ENV_REGISTRY key in scripts/train.py
data_path: data/your_benchmark # Path to your data
split_mode: ratio # "ratio" or "split_dir"
split_ratio: "2:1:7" # train:val:test

View File

@@ -4,89 +4,78 @@ Benchmark Environment Template
Copy this file and implement the TODO sections to add a new benchmark.
The EnvAdapter is responsible for:
1. Executing tasks using the target model + current skill document
2. Evaluating predictions against ground truth
3. Returning structured results for the training loop
1. Building train/eval environment payloads
2. Running rollout and returning scored result rows
3. Reflecting on results and returning patch candidates
"""
from __future__ import annotations
from skillopt.datasets.base import BatchSpec
from skillopt.envs._template.loader_template import TemplateBenchmarkDataLoader
from skillopt.envs.base import EnvAdapter
class TemplateBenchmarkEnv(EnvAdapter):
class TemplateBenchmarkAdapter(EnvAdapter):
"""
Environment adapter for <Your Benchmark Name>.
Rename this class and implement the abstract methods below.
"""
def __init__(self, cfg: dict):
super().__init__(cfg)
# TODO: Initialize benchmark-specific state
# Example: self.tools = load_tools(cfg)
def __init__(
self,
split_dir: str = "",
data_path: str = "",
split_mode: str = "ratio",
split_ratio: str = "2:1:7",
split_seed: int = 42,
split_output_dir: str = "",
seed: int = 42,
limit: int = 0,
**kwargs,
) -> None:
self.dataloader = TemplateBenchmarkDataLoader(
split_dir=split_dir,
data_path=data_path,
split_mode=split_mode,
split_ratio=split_ratio,
split_seed=split_seed,
split_output_dir=split_output_dir,
seed=seed,
limit=limit,
)
# TODO: initialize benchmark-specific runtime options from kwargs
async def execute(self, item, skill: str, model):
def setup(self, cfg: dict) -> None:
super().setup(cfg)
self.dataloader.setup(cfg)
def get_dataloader(self):
return self.dataloader
def build_env_from_batch(self, batch: BatchSpec, **kwargs):
return list(batch.payload or [])
def build_train_env(self, batch_size: int, seed: int, **kwargs):
batch = self.dataloader.build_train_batch(batch_size=batch_size, seed=seed, **kwargs)
return self.build_env_from_batch(batch, **kwargs)
def build_eval_env(self, env_num: int, split: str, seed: int, **kwargs):
batch = self.dataloader.build_eval_batch(env_num=env_num, split=split, seed=seed, **kwargs)
return self.build_env_from_batch(batch, **kwargs)
def rollout(self, env_manager, skill_content: str, out_dir: str, **kwargs) -> list[dict]:
"""
Execute a single task with the target model.
Args:
item: DataItem with .id, .input, .ground_truth, .metadata
skill: Current skill document content (Markdown string)
model: Target model backend instance
Returns:
TaskResult with prediction, score, and trajectory
Run one batch and return list[dict] with at least:
{"id": str, "hard": int, "soft": float}
"""
# Step 1: Build the prompt combining skill + task input
prompt = self.build_prompt(item, skill)
raise NotImplementedError("Implement rollout() for your benchmark")
# Step 2: Call the target model
# TODO: Customize the message format for your benchmark
messages = [
{"role": "system", "content": skill},
{"role": "user", "content": item.input},
]
response = await model.generate(messages)
# Step 3: Parse the model response into a prediction
prediction = self.parse_response(response.content)
# Step 4: Score the prediction
score = self.evaluate(prediction, item.ground_truth)
# Step 5: Return structured result
return {
"item_id": item.id,
"prediction": prediction,
"score": score,
"trajectory": messages + [{"role": "assistant", "content": response.content}],
}
def evaluate(self, prediction: str, ground_truth: str) -> float:
def reflect(self, results: list[dict], skill_content: str, out_dir: str, **kwargs) -> list[dict | None]:
"""
Score a prediction against the ground truth.
Returns:
Float between 0.0 (wrong) and 1.0 (correct)
TODO: Implement your scoring metric. Common options:
- Exact match: float(pred.strip().lower() == gt.strip().lower())
- F1 score: compute token overlap
- ANLS: for document QA tasks
- Custom: any float in [0, 1]
Reflect on rollout results and return patch dicts (or None entries).
"""
# Placeholder — exact match
return float(prediction.strip().lower() == ground_truth.strip().lower())
raise NotImplementedError("Implement reflect() for your benchmark")
def build_prompt(self, item, skill: str) -> str:
"""Combine skill document with task input."""
return f"{skill}\n\n---\n\nQuestion: {item.input}"
def parse_response(self, response: str) -> str:
"""
Extract the answer from the model's raw response.
TODO: Implement extraction logic. For example:
- Extract text after "Answer:"
- Parse JSON output
- Extract from code blocks
"""
return response.strip()
def get_task_types(self) -> list[str]:
return ["your_benchmark"]

View File

@@ -3,101 +3,37 @@ Benchmark Data Loader Template
================================
Copy this file and implement the TODO sections to load your benchmark data.
The DataLoader is responsible for:
1. Loading raw data from disk
2. Splitting into train / validation / test sets
3. Providing DataItem objects to the training loop
The SplitDataLoader is responsible for:
1. Loading raw data from disk for ratio split mode
2. Loading items from train/val/test directories for split_dir mode
3. Returning list[dict] items used by the training loop
"""
from pathlib import Path
from __future__ import annotations
from skillopt.datasets.base import SplitDataLoader
class TemplateBenchmarkLoader:
class TemplateBenchmarkDataLoader(SplitDataLoader):
"""
Data loader for <Your Benchmark Name>.
Rename this class and implement the methods below.
"""
def __init__(self, data_dir: str = "data/your_benchmark", **kwargs):
self.data_dir = Path(data_dir)
self.items = []
self.splits = {}
def setup(self, cfg: dict):
def load_raw_items(self, data_path: str) -> list[dict]:
"""
Initialize the loader with config.
Called once before training starts.
Args:
cfg: Dict with keys like 'split_mode', 'train_ratio', 'val_ratio', etc.
"""
# Step 1: Load raw data
self.items = self._load_items()
Parse raw benchmark data for split_mode="ratio".
# Step 2: Create splits
split_mode = cfg.get("split_mode", "ratio")
if split_mode == "ratio":
self._split_by_ratio(
train_ratio=cfg.get("train_ratio", 0.7),
val_ratio=cfg.get("val_ratio", 0.15),
)
elif split_mode == "split_dir":
self._load_predefined_splits(cfg.get("split_dir", self.data_dir))
def _load_items(self) -> list:
Return a list of normalized item dicts.
"""
Load raw data into structured items.
TODO: Implement data loading. Each item should have at minimum:
- id: unique identifier
- input: the task input (question, instruction, etc.)
- ground_truth: the expected answer
- metadata: optional dict with extra info
Example:
items = []
for path in self.data_dir.glob("*.json"):
data = json.loads(path.read_text())
for entry in data:
items.append({
"id": entry["id"],
"input": entry["question"],
"ground_truth": entry["answer"],
"metadata": {"source": path.name},
})
return items
"""
raise NotImplementedError("Implement _load_items() for your benchmark")
# TODO: customize when your raw source format differs.
return super().load_raw_items(data_path)
def _split_by_ratio(self, train_ratio: float, val_ratio: float):
"""Split items by ratio."""
import random
random.shuffle(self.items)
n = len(self.items)
n_train = int(n * train_ratio)
n_val = int(n * val_ratio)
self.splits = {
"train": self.items[:n_train],
"valid": self.items[n_train:n_train + n_val],
"test": self.items[n_train + n_val:],
}
def _load_predefined_splits(self, split_dir):
"""Load from pre-split directories."""
# TODO: Implement if your benchmark has pre-defined splits
raise NotImplementedError
def get_split_items(self, split: str) -> list:
def load_split_items(self, split_path: str) -> list[dict]:
"""
Return items for a given split.
Args:
split: One of "train", "valid", "test"
Returns:
List of data items for the requested split
Parse one split directory for split_mode="split_dir".
split_path points to train/, val/, or test/.
"""
if split not in self.splits:
raise ValueError(f"Unknown split '{split}'. Available: {list(self.splits.keys())}")
return self.splits[split]
# TODO: customize when each split directory has a custom layout.
return super().load_split_items(split_path)

View File

@@ -0,0 +1,13 @@
from skillopt.datasets.base import SplitDataLoader
from skillopt.envs._template.env_template import TemplateBenchmarkAdapter
from skillopt.envs._template.loader_template import TemplateBenchmarkDataLoader
def test_template_adapter_is_concrete():
adapter = TemplateBenchmarkAdapter()
assert adapter.get_task_types() == ["your_benchmark"]
def test_template_loader_uses_split_dataloader():
loader = TemplateBenchmarkDataLoader()
assert isinstance(loader, SplitDataLoader)