mirror of
https://github.com/github/spec-kit.git
synced 2026-07-03 20:36:23 +08:00
feat(workflows): allow resume to accept updated workflow inputs (#2815)
`workflow resume` now accepts `--input key=value` (the same flag and parsing as `workflow run`, via a shared `_parse_input_values` helper). Supplied values are merged over the run's persisted inputs and re-resolved through the existing typed-validation path (`_resolve_inputs`), so a resumed/re-run step sees the updated inputs and ill-typed values fail fast. Keys not supplied keep their persisted values; resuming without `--input` is unchanged. Reference docs updated. Distinct from #2405 (file-reference inputs at run time): this is about supplying inputs at resume time, reusing the existing input model. Closes #2812. Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -28,8 +28,18 @@ specify workflow run speckit -i spec="Build a kanban board with drag-and-drop ta
|
||||
specify workflow resume <run_id>
|
||||
```
|
||||
|
||||
| Option | Description |
|
||||
| ------------------- | -------------------------------------------------------- |
|
||||
| `-i` / `--input` | Updated input values as `key=value` (repeatable) |
|
||||
|
||||
Resumes a paused or failed workflow run from the exact step where it stopped. Useful after responding to a gate step or fixing an issue that caused a failure.
|
||||
|
||||
Supplied `--input` values are merged over the run's stored inputs and re-validated against the workflow's input types, then the blocked step is re-run with the updated values. This lets a run continue with information that only became available after it paused, or with a corrected value after a failure:
|
||||
|
||||
```bash
|
||||
specify workflow resume <run_id> --input cmd="exit 0"
|
||||
```
|
||||
|
||||
## Workflow Status
|
||||
|
||||
```bash
|
||||
|
||||
@@ -2717,6 +2717,22 @@ workflow_catalog_app = typer.Typer(
|
||||
workflow_app.add_typer(workflow_catalog_app, name="catalog")
|
||||
|
||||
|
||||
def _parse_input_values(input_values: list[str] | None) -> dict[str, Any]:
|
||||
"""Parse repeated ``key=value`` CLI inputs into a dict.
|
||||
|
||||
Shared by ``workflow run`` and ``workflow resume``. Exits with an error
|
||||
on any entry missing ``=``.
|
||||
"""
|
||||
inputs: dict[str, Any] = {}
|
||||
for kv in input_values or []:
|
||||
if "=" not in kv:
|
||||
console.print(f"[red]Error:[/red] Invalid input format: {kv!r} (expected key=value)")
|
||||
raise typer.Exit(1)
|
||||
key, _, value = kv.partition("=")
|
||||
inputs[key.strip()] = value.strip()
|
||||
return inputs
|
||||
|
||||
|
||||
@workflow_app.command("run")
|
||||
def workflow_run(
|
||||
source: str = typer.Argument(..., help="Workflow ID or YAML file path"),
|
||||
@@ -2749,14 +2765,7 @@ def workflow_run(
|
||||
raise typer.Exit(1)
|
||||
|
||||
# Parse inputs
|
||||
inputs: dict[str, Any] = {}
|
||||
if input_values:
|
||||
for kv in input_values:
|
||||
if "=" not in kv:
|
||||
console.print(f"[red]Error:[/red] Invalid input format: {kv!r} (expected key=value)")
|
||||
raise typer.Exit(1)
|
||||
key, _, value = kv.partition("=")
|
||||
inputs[key.strip()] = value.strip()
|
||||
inputs = _parse_input_values(input_values)
|
||||
|
||||
console.print(f"\n[bold cyan]Running workflow:[/bold cyan] {definition.name} ({definition.id})")
|
||||
console.print(f"[dim]Version: {definition.version}[/dim]\n")
|
||||
@@ -2787,6 +2796,9 @@ def workflow_run(
|
||||
@workflow_app.command("resume")
|
||||
def workflow_resume(
|
||||
run_id: str = typer.Argument(..., help="Run ID to resume"),
|
||||
input_values: list[str] | None = typer.Option(
|
||||
None, "--input", "-i", help="Updated input values as key=value pairs"
|
||||
),
|
||||
):
|
||||
"""Resume a paused or failed workflow run."""
|
||||
from .workflows.engine import WorkflowEngine
|
||||
@@ -2795,8 +2807,10 @@ def workflow_resume(
|
||||
engine = WorkflowEngine(project_root)
|
||||
engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026")
|
||||
|
||||
inputs = _parse_input_values(input_values)
|
||||
|
||||
try:
|
||||
state = engine.resume(run_id)
|
||||
state = engine.resume(run_id, inputs or None)
|
||||
except FileNotFoundError:
|
||||
console.print(f"[red]Error:[/red] Run not found: {run_id}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
@@ -507,8 +507,19 @@ class WorkflowEngine:
|
||||
state.save()
|
||||
return state
|
||||
|
||||
def resume(self, run_id: str) -> RunState:
|
||||
"""Resume a paused or failed workflow run."""
|
||||
def resume(
|
||||
self,
|
||||
run_id: str,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
) -> RunState:
|
||||
"""Resume a paused or failed workflow run.
|
||||
|
||||
When ``inputs`` is provided, the values are merged over the run's
|
||||
persisted inputs and re-resolved through the same typed validation
|
||||
path used by :meth:`execute`, so the resumed step sees updated
|
||||
workflow inputs. Keys not supplied keep their persisted values; an
|
||||
empty/``None`` ``inputs`` leaves the run's inputs unchanged.
|
||||
"""
|
||||
state = RunState.load(run_id, self.project_root)
|
||||
if state.status not in (RunStatus.PAUSED, RunStatus.FAILED):
|
||||
msg = f"Cannot resume run {run_id!r} with status {state.status.value!r}."
|
||||
@@ -524,6 +535,12 @@ class WorkflowEngine:
|
||||
else:
|
||||
definition = self.load_workflow(state.workflow_id)
|
||||
|
||||
# Merge any newly-supplied inputs over the persisted ones and
|
||||
# re-validate through the same typing path as the initial run.
|
||||
if inputs:
|
||||
merged = {**state.inputs, **inputs}
|
||||
state.inputs = self._resolve_inputs(definition, merged)
|
||||
|
||||
# Restore context
|
||||
context = StepContext(
|
||||
inputs=state.inputs,
|
||||
|
||||
@@ -3026,3 +3026,118 @@ steps:
|
||||
assert state.status == RunStatus.COMPLETED
|
||||
assert "do-plan" in state.step_results
|
||||
assert "do-specify" not in state.step_results
|
||||
|
||||
|
||||
class TestResumeWithInputs:
|
||||
"""Test that `workflow resume` can accept updated workflow inputs."""
|
||||
|
||||
_WF_CMD = """
|
||||
schema_version: "1.0"
|
||||
workflow:
|
||||
id: "resume-cmd-wf"
|
||||
name: "Resume Cmd WF"
|
||||
version: "1.0.0"
|
||||
inputs:
|
||||
cmd:
|
||||
type: string
|
||||
default: "exit 1"
|
||||
steps:
|
||||
- id: s
|
||||
type: shell
|
||||
run: "{{ inputs.cmd }}"
|
||||
"""
|
||||
|
||||
_WF_NUM = """
|
||||
schema_version: "1.0"
|
||||
workflow:
|
||||
id: "resume-num-wf"
|
||||
name: "Resume Num WF"
|
||||
version: "1.0.0"
|
||||
inputs:
|
||||
count:
|
||||
type: number
|
||||
default: 1
|
||||
steps:
|
||||
- id: gate
|
||||
type: gate
|
||||
message: "Review"
|
||||
options: [approve, reject]
|
||||
"""
|
||||
|
||||
def _engine(self, project_dir):
|
||||
from specify_cli.workflows.engine import WorkflowEngine
|
||||
return WorkflowEngine(project_dir)
|
||||
|
||||
def test_resume_with_input_reruns_step_with_new_value(self, project_dir):
|
||||
from specify_cli.workflows.engine import WorkflowDefinition
|
||||
from specify_cli.workflows.base import RunStatus
|
||||
|
||||
definition = WorkflowDefinition.from_string(self._WF_CMD)
|
||||
engine = self._engine(project_dir)
|
||||
|
||||
state = engine.execute(definition)
|
||||
assert state.status == RunStatus.FAILED # "exit 1" fails
|
||||
|
||||
resumed = engine.resume(state.run_id, {"cmd": "exit 0"})
|
||||
assert resumed.status == RunStatus.COMPLETED
|
||||
assert resumed.inputs["cmd"] == "exit 0"
|
||||
|
||||
def test_resume_without_input_preserves_inputs(self, project_dir):
|
||||
from specify_cli.workflows.engine import WorkflowDefinition
|
||||
from specify_cli.workflows.base import RunStatus
|
||||
|
||||
definition = WorkflowDefinition.from_string(self._WF_CMD)
|
||||
engine = self._engine(project_dir)
|
||||
|
||||
state = engine.execute(definition)
|
||||
assert state.status == RunStatus.FAILED
|
||||
|
||||
resumed = engine.resume(state.run_id)
|
||||
assert resumed.status == RunStatus.FAILED # still "exit 1"
|
||||
assert resumed.inputs["cmd"] == "exit 1"
|
||||
|
||||
def test_resume_merges_and_coerces_typed_input(self, project_dir):
|
||||
import json as _json
|
||||
from specify_cli.workflows.engine import WorkflowDefinition
|
||||
from specify_cli.workflows.base import RunStatus
|
||||
|
||||
definition = WorkflowDefinition.from_string(self._WF_NUM)
|
||||
engine = self._engine(project_dir)
|
||||
|
||||
state = engine.execute(definition)
|
||||
assert state.status == RunStatus.PAUSED
|
||||
|
||||
resumed = engine.resume(state.run_id, {"count": "5"})
|
||||
assert resumed.inputs["count"] == 5 # coerced string -> number
|
||||
|
||||
inputs_file = (
|
||||
project_dir / ".specify" / "workflows" / "runs" / state.run_id / "inputs.json"
|
||||
)
|
||||
assert _json.loads(inputs_file.read_text())["inputs"]["count"] == 5
|
||||
|
||||
def test_resume_invalid_typed_input_raises(self, project_dir):
|
||||
from specify_cli.workflows.engine import WorkflowDefinition
|
||||
|
||||
definition = WorkflowDefinition.from_string(self._WF_NUM)
|
||||
engine = self._engine(project_dir)
|
||||
|
||||
state = engine.execute(definition)
|
||||
with pytest.raises(ValueError):
|
||||
engine.resume(state.run_id, {"count": "not-a-number"})
|
||||
|
||||
def test_cli_resume_input_invalid_format_errors(self, project_dir):
|
||||
from typer.testing import CliRunner
|
||||
from unittest.mock import patch
|
||||
from specify_cli import app
|
||||
from specify_cli.workflows.engine import WorkflowDefinition
|
||||
|
||||
definition = WorkflowDefinition.from_string(self._WF_NUM)
|
||||
state = self._engine(project_dir).execute(definition)
|
||||
|
||||
runner = CliRunner()
|
||||
with patch.object(Path, "cwd", return_value=project_dir):
|
||||
result = runner.invoke(
|
||||
app, ["workflow", "resume", state.run_id, "--input", "bogus"]
|
||||
)
|
||||
assert result.exit_code == 1
|
||||
assert "Invalid input format" in result.stdout
|
||||
|
||||
Reference in New Issue
Block a user