Files
github-spec-kit/src/specify_cli/workflows/engine.py
Ali jawwad d378485696 fix(workflows): reject infinite number-input default instead of raising OverflowError (#3199)
WorkflowEngine._coerce_input normalizes a whole-valued number to int via int(value). For an infinite float (e.g. a 'type: number' input with YAML 'default: .inf') int(inf) raises OverflowError, which is not in the except (ValueError, TypeError) tuple. validate_workflow eager-coerces declared defaults and is documented to RETURN a list of errors, but it only catches ValueError -- so the OverflowError escaped and validate_workflow raised instead of reporting, breaking its contract. (NaN already surfaced cleanly because int(nan) raises ValueError.)

Add OverflowError to the except tuple so an infinite default surfaces as the same clean 'expected a number' ValueError as NaN, consistent with the function's existing fail-fast-on-authoring-mistakes design. Finite values (5.0 -> 5, 3.5 -> 3.5) are unaffected.

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 10:05:51 -05:00

1071 lines
44 KiB
Python

"""Workflow engine — loads, validates, and executes workflow YAML definitions.
The engine is the orchestrator that:
- Parses workflow YAML definitions
- Validates step configurations and requirements
- Executes steps sequentially, dispatching to the correct step type
- Manages state persistence for resume capability
- Handles control flow (branching, loops, fan-out/fan-in)
"""
from __future__ import annotations
import json
import os
import re
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
from ..integration_state import (
default_integration_key,
try_read_integration_json,
)
from .base import RunStatus, StepContext, StepResult, StepStatus
# -- Workflow Definition --------------------------------------------------
class WorkflowDefinition:
"""Parsed and validated workflow YAML definition."""
def __init__(self, data: dict[str, Any], source_path: Path | None = None) -> None:
self.data = data
self.source_path = source_path
workflow = data.get("workflow", {})
self.id: str = workflow.get("id", "")
self.name: str = workflow.get("name", "")
self.version: str = workflow.get("version", "0.0.0")
self.author: str = workflow.get("author", "")
self.description: str = workflow.get("description", "")
self.schema_version: str = data.get("schema_version", "1.0")
# Defaults
self.default_integration: str | None = workflow.get("integration")
self.default_model: str | None = workflow.get("model")
self.default_options: dict[str, Any] = workflow.get("options") or {}
if not isinstance(self.default_options, dict):
self.default_options = {}
# Advisory pre-conditions (spec-kit version / integrations a workflow
# expects). Validated by ``validate_workflow`` (recognized keys only;
# see ``_RECOGNIZED_REQUIRES_KEYS``) but NOT enforced at run time — they
# are not a security boundary. In particular there is no
# ``requires.permissions`` capability gate: shell steps always run with
# the user's privileges.
#
# Holds the raw parsed value, so before ``validate_workflow`` runs it may
# be a non-mapping (``None`` for a bare ``requires:``, a list for
# ``requires: []``, etc.); typed ``Any`` rather than ``dict[str, Any]``
# to avoid implying it is always a mapping at this point.
self.requires: Any = data.get("requires", {})
# Inputs
self.inputs: dict[str, Any] = data.get("inputs", {})
# Steps
self.steps: list[dict[str, Any]] = data.get("steps", [])
@classmethod
def from_yaml(cls, path: Path) -> WorkflowDefinition:
"""Load a workflow definition from a YAML file."""
with open(path, encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, dict):
msg = f"Workflow YAML must be a mapping, got {type(data).__name__}."
raise ValueError(msg)
return cls(data, source_path=path)
@classmethod
def from_string(cls, content: str) -> WorkflowDefinition:
"""Load a workflow definition from a YAML string."""
data = yaml.safe_load(content)
if not isinstance(data, dict):
msg = f"Workflow YAML must be a mapping, got {type(data).__name__}."
raise ValueError(msg)
return cls(data)
# -- Workflow Validation --------------------------------------------------
# ID format: lowercase alphanumeric with hyphens
_ID_PATTERN = re.compile(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$")
# Keys accepted under a workflow's ``requires`` block: the advisory
# pre-conditions documented for workflows (``speckit_version`` and
# ``integrations``). This is the *workflow* schema only — the bundle manifest's
# ``requires`` (see ``bundler/models/manifest.py``) is a separate schema that
# also carries ``tools``/``mcp``; those are not workflow ``requires`` keys.
# Any other key — notably ``permissions`` — is rejected by ``validate_workflow``
# so it is never mistaken for an enforced runtime control.
_RECOGNIZED_REQUIRES_KEYS = frozenset({"speckit_version", "integrations"})
# Valid step types (matching STEP_REGISTRY keys)
def _get_valid_step_types() -> set[str]:
"""Return valid step types from the registry, with a built-in fallback."""
from . import STEP_REGISTRY
if STEP_REGISTRY:
return set(STEP_REGISTRY.keys())
return {
"command", "shell", "prompt", "gate", "if", "init",
"switch", "while", "do-while", "fan-out", "fan-in",
}
def validate_workflow(definition: WorkflowDefinition) -> list[str]:
"""Validate a workflow definition and return a list of error messages.
An empty list means the workflow is valid.
"""
errors: list[str] = []
# -- Schema version ---------------------------------------------------
if definition.schema_version not in ("1.0", "1"):
errors.append(
f"Unsupported schema_version {definition.schema_version!r}. "
f"Expected '1.0'."
)
# -- Top-level fields -------------------------------------------------
if not definition.id:
errors.append("Workflow is missing 'workflow.id'.")
elif not _ID_PATTERN.match(definition.id):
errors.append(
f"Workflow ID {definition.id!r} must be lowercase alphanumeric "
f"with hyphens."
)
if not definition.name:
errors.append("Workflow is missing 'workflow.name'.")
if not definition.version:
errors.append("Workflow is missing 'workflow.version'.")
elif not re.match(r"^\d+\.\d+\.\d+$", definition.version):
errors.append(
f"Workflow version {definition.version!r} is not valid "
f"semantic versioning (expected X.Y.Z)."
)
# -- Inputs -----------------------------------------------------------
if not isinstance(definition.inputs, dict):
errors.append("'inputs' must be a mapping (or omitted).")
else:
for input_name, input_def in definition.inputs.items():
if not isinstance(input_def, dict):
errors.append(f"Input {input_name!r} must be a mapping.")
continue
input_type = input_def.get("type")
if input_type and input_type not in ("string", "number", "boolean"):
errors.append(
f"Input {input_name!r} has invalid type {input_type!r}. "
f"Must be 'string', 'number', or 'boolean'."
)
# Validate the default eagerly so authoring mistakes (e.g. a
# default not in the declared enum, or a non-numeric default for
# a number input) surface at install/validation time instead of
# at workflow-execution time. ``"auto"`` for the integration
# input is a runtime-resolved sentinel, so only the
# enum-membership check is exempted for that exact case — the
# declared type is still enforced (e.g. ``type: number`` paired
# with ``default: "auto"`` is still rejected).
if "default" in input_def:
default_value = input_def["default"]
is_auto_integration = (
input_name == "integration" and default_value == "auto"
)
validation_input_def: dict[str, Any] = input_def
if is_auto_integration and "enum" in input_def:
validation_input_def = {
key: value
for key, value in input_def.items()
if key != "enum"
}
try:
WorkflowEngine._coerce_input(
input_name, default_value, validation_input_def
)
except ValueError as exc:
errors.append(
f"Input {input_name!r} has invalid default: {exc}"
)
# -- Requires ---------------------------------------------------------
# ``requires`` declares advisory pre-conditions (the spec-kit version and
# integrations a workflow expects). Only a fixed set of keys is recognized;
# reject anything else so authoring typos surface here instead of being
# silently ignored at runtime. In particular ``requires.permissions`` is
# rejected explicitly: it reads like a runtime capability gate, but no such
# gate exists — a ``shell`` step always runs with the user's privileges, so
# declaring it would give a false sense of sandboxing.
#
# Mirror ``inputs`` validation: an omitted block defaults to ``{}`` and is
# valid, but any present-but-non-mapping value — ``requires:`` (YAML null),
# ``requires: []`` or ``requires: ''`` — is an authoring error and must
# surface here rather than be silently ignored at runtime.
if not isinstance(definition.requires, dict):
errors.append("'requires' must be a mapping (or omitted).")
else:
for key in definition.requires:
if key == "permissions":
errors.append(
"'requires.permissions' is not a recognized or "
"enforced capability gate — shell steps always run "
"with the user's privileges. Remove it and gate "
"sensitive steps with a 'gate' step instead."
)
elif key not in _RECOGNIZED_REQUIRES_KEYS:
errors.append(
f"Unknown 'requires' key {key!r}. Recognized keys: "
f"{', '.join(sorted(_RECOGNIZED_REQUIRES_KEYS))}."
)
# -- Steps ------------------------------------------------------------
if not isinstance(definition.steps, list):
errors.append("'steps' must be a list.")
return errors
if not definition.steps:
errors.append("Workflow has no steps defined.")
seen_ids: set[str] = set()
_validate_steps(definition.steps, seen_ids, errors)
return errors
def _validate_steps(
steps: list[dict[str, Any]],
seen_ids: set[str],
errors: list[str],
) -> None:
"""Recursively validate a list of steps."""
from . import STEP_REGISTRY
for step_config in steps:
if not isinstance(step_config, dict):
errors.append(f"Step must be a mapping, got {type(step_config).__name__}.")
continue
step_id = step_config.get("id")
if not step_id:
errors.append("Step is missing 'id' field.")
continue
if ":" in step_id:
errors.append(
f"Step ID {step_id!r} contains ':' which is reserved "
f"for engine-generated nested IDs (parentId:childId)."
)
if step_id in seen_ids:
errors.append(f"Duplicate step ID {step_id!r}.")
seen_ids.add(step_id)
# Determine step type
step_type = step_config.get("type", "command")
if step_type not in _get_valid_step_types():
errors.append(
f"Step {step_id!r} has invalid type {step_type!r}."
)
continue
# Delegate to step-specific validation
step_impl = STEP_REGISTRY.get(step_type)
if step_impl:
step_errors = step_impl.validate(step_config)
errors.extend(step_errors)
# Validate optional `continue_on_error` field. The engine honours
# this on any step that returns StepStatus.FAILED so the pipeline can route
# around the failure via a downstream `if` or `switch` (or a
# `gate` that surfaces the failure to the operator via message
# interpolation). The field must be a literal boolean —
# coercion from truthy strings is deliberately not supported so
# authoring mistakes surface at validation time rather than
# silently changing run semantics.
if "continue_on_error" in step_config:
coe = step_config["continue_on_error"]
if not isinstance(coe, bool):
errors.append(
f"Step {step_id!r}: 'continue_on_error' must be a "
f"boolean, got {type(coe).__name__}."
)
# Recursively validate nested steps
for nested_key in ("then", "else", "steps"):
nested = step_config.get(nested_key)
if isinstance(nested, list):
_validate_steps(nested, seen_ids, errors)
# Validate switch cases
cases = step_config.get("cases")
if isinstance(cases, dict):
for _case_key, case_steps in cases.items():
if isinstance(case_steps, list):
_validate_steps(case_steps, seen_ids, errors)
# Validate switch default
default = step_config.get("default")
if isinstance(default, list):
_validate_steps(default, seen_ids, errors)
# Validate fan-out nested step (template — not added to seen_ids
# since the engine generates parentId:templateId:index at runtime)
fan_step = step_config.get("step")
if isinstance(fan_step, dict):
fan_errors: list[str] = []
_validate_steps([fan_step], set(), fan_errors)
errors.extend(fan_errors)
# -- Run State Persistence ------------------------------------------------
class RunState:
"""Manages workflow run state for persistence and resume."""
# ``run_id`` is interpolated into a filesystem path (``runs/<run_id>``)
# by both ``save()`` and ``load()``. Constrain it to a charset that
# cannot contain path separators (``/`` ``\``), parent-directory
# segments (``..``), or NULs — anything that could escape the
# ``.specify/workflows/runs/`` directory or be mis-interpreted by the
# filesystem. The first-character anchor blocks IDs that start with
# ``-`` (which would be mistaken for a CLI flag in error messages
# and shell completions).
_RUN_ID_PATTERN = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_-]*$")
@classmethod
def _validate_run_id(cls, run_id: str) -> None:
"""Raise ``ValueError`` if ``run_id`` is not a safe path component.
This is the single source of truth for what counts as a valid
``run_id``. ``__init__`` calls it to reject malformed IDs at
construction time; ``load`` calls it *before* interpolating the
ID into a path so a malicious value cannot probe or read files
outside ``.specify/workflows/runs/<run_id>/``.
"""
if not isinstance(run_id, str) or not cls._RUN_ID_PATTERN.match(run_id):
raise ValueError(
f"Invalid run_id {run_id!r}: must be alphanumeric with "
"hyphens/underscores only (and must start with an "
"alphanumeric character)."
)
def __init__(
self,
run_id: str | None = None,
workflow_id: str = "",
project_root: Path | None = None,
) -> None:
# ``run_id is None`` (omitted) → auto-generate. An explicit empty
# string is *not* the same as "omitted" and must be validated like
# any other caller-provided value — otherwise ``__init__("")``
# would silently substitute a UUID while ``load("")`` rejects, and
# the two entry points would diverge on the empty-string vector.
if run_id is None:
self.run_id = str(uuid.uuid4())[:8]
else:
self.run_id = run_id
self._validate_run_id(self.run_id)
self.workflow_id = workflow_id
self.project_root = project_root or Path(".")
self.status = RunStatus.CREATED
self.current_step_index = 0
self.current_step_id: str | None = None
self.step_results: dict[str, dict[str, Any]] = {}
self.inputs: dict[str, Any] = {}
self.created_at = datetime.now(timezone.utc).isoformat()
self.updated_at = self.created_at
self.log_entries: list[dict[str, Any]] = []
@property
def runs_dir(self) -> Path:
return self.project_root / ".specify" / "workflows" / "runs" / self.run_id
def save(self) -> None:
"""Persist current state to disk."""
self.updated_at = datetime.now(timezone.utc).isoformat()
runs_dir = self.runs_dir
runs_dir.mkdir(parents=True, exist_ok=True)
state_data = {
"run_id": self.run_id,
"workflow_id": self.workflow_id,
"status": self.status.value,
"current_step_index": self.current_step_index,
"current_step_id": self.current_step_id,
"step_results": self.step_results,
"created_at": self.created_at,
"updated_at": self.updated_at,
}
with open(runs_dir / "state.json", "w", encoding="utf-8") as f:
json.dump(state_data, f, indent=2)
inputs_data = {"inputs": self.inputs}
with open(runs_dir / "inputs.json", "w", encoding="utf-8") as f:
json.dump(inputs_data, f, indent=2)
@classmethod
def load(cls, run_id: str, project_root: Path) -> RunState:
"""Load a run state from disk.
Validates ``run_id`` against ``_RUN_ID_PATTERN`` *before* building
the lookup path. Without this guard, a caller passing a value like
``../escape`` (e.g. via ``specify workflow resume`` CLI argument)
would interpolate path-traversal segments into
``runs_dir`` below, letting ``state_path.exists()`` probe arbitrary
paths and ``json.load`` read attacker-planted JSON from outside
the project's ``runs/`` directory. ``__init__`` already runs this
check on the stored ``state_data["run_id"]``, but that fires
*after* the file lookup — too late to prevent the disclosure.
Mirrors the precedent in ``agents._ensure_within_directory``.
"""
cls._validate_run_id(run_id)
runs_dir = project_root / ".specify" / "workflows" / "runs" / run_id
state_path = runs_dir / "state.json"
if not state_path.exists():
msg = f"Run state not found: {state_path}"
raise FileNotFoundError(msg)
with open(state_path, encoding="utf-8") as f:
state_data = json.load(f)
state = cls(
run_id=state_data["run_id"],
workflow_id=state_data["workflow_id"],
project_root=project_root,
)
state.status = RunStatus(state_data["status"])
state.current_step_index = state_data.get("current_step_index", 0)
state.current_step_id = state_data.get("current_step_id")
state.step_results = state_data.get("step_results", {})
state.created_at = state_data.get("created_at", "")
state.updated_at = state_data.get("updated_at", "")
inputs_path = runs_dir / "inputs.json"
if inputs_path.exists():
with open(inputs_path, encoding="utf-8") as f:
inputs_data = json.load(f)
state.inputs = inputs_data.get("inputs", {})
return state
def append_log(self, entry: dict[str, Any]) -> None:
"""Append a log entry to the run log."""
entry["timestamp"] = datetime.now(timezone.utc).isoformat()
self.log_entries.append(entry)
runs_dir = self.runs_dir
runs_dir.mkdir(parents=True, exist_ok=True)
with open(runs_dir / "log.jsonl", "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
# -- Workflow Engine ------------------------------------------------------
class WorkflowEngine:
"""Orchestrator that loads, validates, and executes workflow definitions."""
def __init__(self, project_root: Path | None = None) -> None:
self.project_root = project_root or Path(".")
self.on_step_start: Any = None # Callable[[str, str], None] | None
def load_workflow(self, source: str | Path) -> WorkflowDefinition:
"""Load a workflow from an installed ID or a local YAML path.
Parameters
----------
source:
Either a workflow ID (looked up in the installed workflows
directory) or a path to a YAML file.
Returns
-------
A parsed ``WorkflowDefinition`` (not yet validated; call
``validate_workflow()`` or ``engine.validate()`` separately).
Raises
------
FileNotFoundError:
If the workflow file cannot be found.
ValueError:
If the workflow YAML is invalid.
"""
path = Path(source).expanduser()
# Try as a direct file path first
if path.suffix.lower() in (".yml", ".yaml") and path.is_file():
return WorkflowDefinition.from_yaml(path)
# Try as an installed workflow ID
installed_path = (
self.project_root
/ ".specify"
/ "workflows"
/ str(source)
/ "workflow.yml"
)
if installed_path.exists():
return WorkflowDefinition.from_yaml(installed_path)
msg = f"Workflow not found: {source}"
raise FileNotFoundError(msg)
def validate(self, definition: WorkflowDefinition) -> list[str]:
"""Validate a workflow definition."""
return validate_workflow(definition)
def execute(
self,
definition: WorkflowDefinition,
inputs: dict[str, Any] | None = None,
run_id: str | None = None,
) -> RunState:
"""Execute a workflow definition.
Parameters
----------
definition:
The validated workflow definition.
inputs:
User-provided input values.
run_id:
Optional run ID (uses SPECKIT_WORKFLOW_RUN_ID when set, otherwise auto-generated).
Returns
-------
The final ``RunState`` after execution completes (or pauses).
"""
from . import STEP_REGISTRY
effective_run_id = run_id
if effective_run_id is None:
env_run_id = os.environ.get("SPECKIT_WORKFLOW_RUN_ID", "").strip()
if env_run_id:
effective_run_id = env_run_id
state = RunState(
run_id=effective_run_id,
workflow_id=definition.id,
project_root=self.project_root,
)
# Persist a copy of the workflow definition so resume can
# reload it even if the original source is no longer available
# (e.g. a local YAML path that was moved or deleted).
run_dir = self.project_root / ".specify" / "workflows" / "runs" / state.run_id
run_dir.mkdir(parents=True, exist_ok=True)
workflow_copy = run_dir / "workflow.yml"
import yaml
with open(workflow_copy, "w", encoding="utf-8") as f:
yaml.safe_dump(definition.data, f, sort_keys=False)
# Resolve inputs
resolved_inputs = self._resolve_inputs(definition, inputs or {})
state.inputs = resolved_inputs
state.status = RunStatus.RUNNING
state.save()
context = StepContext(
inputs=resolved_inputs,
default_integration=definition.default_integration,
default_model=definition.default_model,
default_options=definition.default_options,
project_root=str(self.project_root),
run_id=state.run_id,
)
# Execute steps
try:
self._execute_steps(definition.steps, context, state, STEP_REGISTRY)
except KeyboardInterrupt:
state.status = RunStatus.PAUSED
state.append_log({"event": "workflow_interrupted"})
state.save()
return state
except Exception as exc:
state.status = RunStatus.FAILED
state.append_log({"event": "workflow_failed", "error": str(exc)})
state.save()
raise
if state.status == RunStatus.RUNNING:
state.status = RunStatus.COMPLETED
state.append_log({"event": "workflow_finished", "status": state.status.value})
state.save()
return state
def resume(
self,
run_id: str,
inputs: dict[str, Any] | None = None,
) -> RunState:
"""Resume a paused or failed workflow run.
When ``inputs`` is provided, the values are merged over the run's
persisted inputs and re-resolved through the same typed validation
path used by :meth:`execute`, so the resumed step sees updated
workflow inputs. Keys not supplied keep their persisted values; an
empty/``None`` ``inputs`` leaves the run's inputs unchanged.
"""
state = RunState.load(run_id, self.project_root)
if state.status not in (RunStatus.PAUSED, RunStatus.FAILED):
msg = f"Cannot resume run {run_id!r} with status {state.status.value!r}."
raise ValueError(msg)
# Load the workflow definition — try the persisted copy in the
# run directory first so resume works even if the original
# source (e.g. a local YAML path) is no longer available.
run_dir = self.project_root / ".specify" / "workflows" / "runs" / run_id
run_copy = run_dir / "workflow.yml"
if run_copy.exists():
definition = WorkflowDefinition.from_yaml(run_copy)
else:
definition = self.load_workflow(state.workflow_id)
# Merge any newly-supplied inputs over the persisted ones and
# re-validate through the same typing path as the initial run.
if inputs:
merged = {**state.inputs, **inputs}
state.inputs = self._resolve_inputs(definition, merged)
# Restore context
context = StepContext(
inputs=state.inputs,
steps=state.step_results,
default_integration=definition.default_integration,
default_model=definition.default_model,
default_options=definition.default_options,
project_root=str(self.project_root),
run_id=state.run_id,
)
from . import STEP_REGISTRY
state.status = RunStatus.RUNNING
state.save()
# Resume from the current step — re-execute it so gates
# can prompt interactively again.
remaining_steps = definition.steps[state.current_step_index :]
step_offset = state.current_step_index
try:
self._execute_steps(
remaining_steps, context, state, STEP_REGISTRY,
step_offset=step_offset,
)
except KeyboardInterrupt:
state.status = RunStatus.PAUSED
state.append_log({"event": "workflow_interrupted"})
state.save()
return state
except Exception as exc:
state.status = RunStatus.FAILED
state.append_log({"event": "resume_failed", "error": str(exc)})
state.save()
raise
if state.status == RunStatus.RUNNING:
state.status = RunStatus.COMPLETED
state.append_log({"event": "workflow_finished", "status": state.status.value})
state.save()
return state
def _execute_steps(
self,
steps: list[dict[str, Any]],
context: StepContext,
state: RunState,
registry: dict[str, Any],
*,
step_offset: int = 0,
) -> None:
"""Execute a list of steps sequentially."""
for i, step_config in enumerate(steps):
step_id = step_config.get("id", f"step-{i}")
step_type = step_config.get("type", "command")
state.current_step_id = step_id
if step_offset >= 0:
state.current_step_index = step_offset + i
state.save()
state.append_log(
{"event": "step_started", "step_id": step_id, "type": step_type}
)
# Log progress — use the engine's on_step_start callback if set,
# otherwise stay silent (library-safe default).
label = step_config.get("command", "") or step_type
if self.on_step_start is not None:
self.on_step_start(step_id, label)
step_impl = registry.get(step_type)
if not step_impl:
state.status = RunStatus.FAILED
state.append_log(
{
"event": "step_failed",
"step_id": step_id,
"error": f"Unknown step type: {step_type!r}",
}
)
state.save()
return
result: StepResult = step_impl.execute(step_config, context)
# Record step results — prefer resolved values from step output
step_data = {
"type": step_type,
"integration": result.output.get("integration")
or step_config.get("integration")
or context.default_integration,
"model": result.output.get("model")
or step_config.get("model")
or context.default_model,
"options": result.output.get("options")
or step_config.get("options", {}),
"input": result.output.get("input")
or step_config.get("input", {}),
"output": result.output,
"status": result.status.value,
}
context.steps[step_id] = step_data
state.step_results[step_id] = step_data
state.append_log(
{
"event": "step_completed",
"step_id": step_id,
"status": result.status.value,
}
)
# Handle gate pauses
if result.status == StepStatus.PAUSED:
state.status = RunStatus.PAUSED
state.save()
return
# Handle failures
if result.status == StepStatus.FAILED:
# Gate abort (output.aborted) maps to ABORTED status.
# Aborts are deliberate operator decisions, so
# `continue_on_error` does NOT override them — that flag
# is for transient/expected step failures only.
if result.output.get("aborted"):
state.status = RunStatus.ABORTED
state.append_log(
{
"event": "workflow_aborted",
"step_id": step_id,
}
)
state.save()
return
# `continue_on_error: true` lets the pipeline route
# around the failure instead of halting. The step
# result (including exit_code, stderr, status) is
# still recorded so a downstream `if` or `switch`
# can branch on it (or a `gate` can surface it to the
# operator via message interpolation). Log a single,
# unambiguous event per failure resolution — either
# the run continued past it, or it halted.
#
# Use identity comparison (`is True`) rather than
# truthiness so that only a literal boolean enables
# the behaviour, even if validation was skipped.
# Validation rejects non-bool values at parse time,
# but `WorkflowEngine.execute()` does not auto-validate
# (see `WorkflowEngine.load_workflow`, whose docstring
# explicitly notes "not yet validated; call
# `validate_workflow()` or `engine.validate()`
# separately"), so a caller passing an unvalidated
# definition could otherwise see truthy non-bool
# values like the string `"true"` silently change
# run semantics.
if step_config.get("continue_on_error") is True:
state.append_log(
{
"event": "step_continue_on_error",
"step_id": step_id,
"error": result.error,
}
)
state.save()
continue
state.status = RunStatus.FAILED
state.append_log(
{
"event": "step_failed",
"step_id": step_id,
"error": result.error,
}
)
state.save()
return
# Execute nested steps (from control flow)
# NOTE: Nested steps run with step_offset=-1 so they don't
# update current_step_index. If a nested step pauses,
# resume will re-run the parent step and its nested body.
# A step-path stack for exact nested resume is a future
# enhancement.
if result.next_steps:
self._execute_steps(
result.next_steps, context, state, registry,
step_offset=-1,
)
if state.status in (
RunStatus.PAUSED,
RunStatus.FAILED,
RunStatus.ABORTED,
):
return
# Loop iteration: while/do-while re-evaluate after body
if step_type in ("while", "do-while"):
from .expressions import evaluate_condition
max_iters = step_config.get("max_iterations")
if not isinstance(max_iters, int) or max_iters < 1:
max_iters = 10
condition = step_config.get("condition", False)
for _loop_iter in range(max_iters - 1):
if not evaluate_condition(condition, context):
break
# Namespace nested step IDs per iteration
# so logs and state keys are unique.
# Execute one step at a time and alias each
# result back to the unprefixed key so that
# later steps in the same body and the loop
# condition see the latest values.
for ns_idx, ns in enumerate(result.next_steps):
ns_copy = dict(ns)
orig = ns_copy.get("id")
base_id = orig or f"step-{ns_idx}"
ns_copy["id"] = f"{step_id}:{base_id}:{_loop_iter + 1}"
self._execute_steps(
[ns_copy], context, state, registry,
step_offset=-1,
)
if state.status in (
RunStatus.PAUSED,
RunStatus.FAILED,
RunStatus.ABORTED,
):
return
if orig and ns_copy["id"] in context.steps:
context.steps[orig] = context.steps[ns_copy["id"]]
state.step_results[orig] = context.steps[ns_copy["id"]]
# Fan-out: execute nested step template per item with unique IDs
if step_type == "fan-out":
items = result.output.get("items", [])
template = result.output.get("step_template", {})
if template and items:
fan_out_results = []
for item_idx, item_val in enumerate(result.output["items"]):
context.item = item_val
# Per-item ID: parentId:templateId:index
item_step = dict(template)
base_id = item_step.get("id", "item")
item_step["id"] = f"{step_id}:{base_id}:{item_idx}"
self._execute_steps(
[item_step], context, state, registry,
step_offset=-1,
)
# Collect per-item result for fan-in
item_result = context.steps.get(item_step["id"], {})
fan_out_results.append(item_result.get("output", {}))
if state.status in (
RunStatus.PAUSED,
RunStatus.FAILED,
RunStatus.ABORTED,
):
break
context.item = None
# Preserve original output and add collected results
fan_out_output = dict(result.output)
fan_out_output["results"] = fan_out_results
context.steps[step_id]["output"] = fan_out_output
state.step_results[step_id]["output"] = fan_out_output
if state.status in (
RunStatus.PAUSED,
RunStatus.FAILED,
RunStatus.ABORTED,
):
return
else:
# Empty items or no template — normalize output
result.output["results"] = []
context.steps[step_id]["output"] = result.output
state.step_results[step_id]["output"] = result.output
def _resolve_inputs(
self,
definition: WorkflowDefinition,
provided: dict[str, Any],
) -> dict[str, Any]:
"""Resolve workflow inputs against definitions and provided values."""
resolved: dict[str, Any] = {}
for name, input_def in definition.inputs.items():
if not isinstance(input_def, dict):
continue
if name in provided:
# Resolve sentinels for explicitly-provided values too: a
# caller passing ``{"integration": "auto"}`` (which the
# workflow prompt advertises as a valid value) must be
# treated identically to omitting the input and letting the
# default flow through, so dispatch never sees the literal
# sentinel.
value = self._resolve_default(name, provided[name])
elif "default" in input_def:
value = self._resolve_default(name, input_def["default"])
elif input_def.get("required", False):
msg = f"Required input {name!r} not provided."
raise ValueError(msg)
else:
continue
# When the ``integration`` default could not be resolved against
# project state and falls back to the literal ``"auto"``
# sentinel, strip ``enum`` from the input definition before
# coercion so a workflow that lists specific integrations in
# ``enum`` does not crash at runtime on the sentinel value.
# NOTE: only enum-membership is skipped; ``_coerce_input``
# still enforces the declared ``type`` against the filtered
# definition (``string`` rejects non-strings, ``number`` rejects
# bools and uncoercible values, ``boolean`` rejects non-bools),
# so ill-typed values still fail fast here.
coerce_input_def = input_def
if (
name == "integration"
and value == "auto"
and "enum" in input_def
):
coerce_input_def = {
key: val
for key, val in input_def.items()
if key != "enum"
}
resolved[name] = self._coerce_input(name, value, coerce_input_def)
return resolved
def _resolve_default(self, name: str, default: Any) -> Any:
"""Resolve special default sentinels against project state.
For the ``integration`` input, ``"auto"`` resolves to the integration
recorded in ``.specify/integration.json`` so workflows dispatch to the
AI the project was actually initialized with, instead of a hardcoded
value baked into the workflow YAML.
"""
if name == "integration" and default == "auto":
resolved = self._load_project_integration()
if resolved is not None:
return resolved
return default
def _load_project_integration(self) -> str | None:
"""Read the default integration key from ``.specify/integration.json``.
Delegates parsing and schema validation to
:func:`try_read_integration_json` — the same low-level helper used by
the CLI — so the engine cannot drift from CLI behavior on the parse
path. Returns ``None`` when the file is missing, malformed, or
written by a newer CLI; callers fall back to the literal default.
"""
state, error = try_read_integration_json(self.project_root)
if state is None or error is not None:
return None
return default_integration_key(state)
@staticmethod
def _coerce_input(
name: str, value: Any, input_def: dict[str, Any]
) -> Any:
"""Coerce a provided input value to the declared type."""
input_type = input_def.get("type", "string")
enum_values = input_def.get("enum")
if input_type == "number":
# Reject bools explicitly: ``bool`` is a subclass of ``int`` so
# ``float(True)`` succeeds and would silently coerce a YAML
# authoring mistake like ``type: number`` + ``default: true``
# into ``1``. Fail fast instead.
if isinstance(value, bool):
msg = f"Input {name!r} expected a number, got {value!r}."
raise ValueError(msg)
try:
value = float(value)
if value == int(value):
value = int(value)
except (ValueError, TypeError, OverflowError):
# OverflowError: `int(value)` raises it for an infinite float
# (e.g. a `default: .inf` authoring mistake), which would
# otherwise escape validate_workflow's `except ValueError` and
# break its "return errors, never raise" contract. Surface it as
# the same clean "expected a number" error as NaN does.
msg = f"Input {name!r} expected a number, got {value!r}."
raise ValueError(msg) from None
elif input_type == "boolean":
if isinstance(value, str):
if value.lower() in ("true", "1", "yes"):
value = True
elif value.lower() in ("false", "0", "no"):
value = False
else:
msg = f"Input {name!r} expected a boolean, got {value!r}."
raise ValueError(msg)
elif not isinstance(value, bool):
msg = f"Input {name!r} expected a boolean, got {value!r}."
raise ValueError(msg)
elif input_type == "string":
# Without this, ``type: string`` accepts any Python value
# (numbers, lists, dicts) because nothing else rejects it —
# YAML ``default: 5`` would slip through. Require an actual
# string so authoring mistakes fail at resolve time.
if not isinstance(value, str):
msg = f"Input {name!r} expected a string, got {value!r}."
raise ValueError(msg)
if enum_values is not None and value not in enum_values:
msg = (
f"Input {name!r} value {value!r} not in allowed "
f"values: {enum_values}."
)
raise ValueError(msg)
return value
def list_runs(self) -> list[dict[str, Any]]:
"""List all workflow runs in the project."""
runs_dir = self.project_root / ".specify" / "workflows" / "runs"
if not runs_dir.exists():
return []
runs: list[dict[str, Any]] = []
for run_dir in sorted(runs_dir.iterdir()):
if not run_dir.is_dir():
continue
state_path = run_dir / "state.json"
if state_path.exists():
with open(state_path, encoding="utf-8") as f:
state_data = json.load(f)
runs.append(state_data)
return runs
class WorkflowAbortError(Exception):
"""Raised when a workflow is aborted (e.g., gate rejection)."""