feat(workflows): add continue_on_error step field

Adds an optional `continue_on_error: bool` field on every step.
When set to `true` and the step fails, the engine records the
result (`exit_code`, `stderr` on `steps.<id>.output` plus `status`
as a sibling key on `steps.<id>`) and continues to the next sibling
step instead of halting the run. Downstream `if`, `switch`, or
`gate` steps can then branch on
`{{ steps.<id>.output.exit_code }}` to route the recovery path.

Engine details
--------------
`WorkflowEngine._execute_steps` now consults the step config when a
step returns `StepStatus.FAILED`:

- Gate aborts (`output.aborted`) always halt the run — operator
  decisions take precedence over the flag.
- Otherwise, if `continue_on_error` is the literal `True`, log a
  `step_continue_on_error` event and proceed to the next sibling.
  The runtime check uses identity comparison (`is True`) rather
  than truthiness, so truthy non-bool values like the string
  `"true"` cannot silently change run semantics even if a caller
  bypasses `validate_workflow()`.
- Otherwise, behave as before: log `step_failed`, set
  `RunStatus.FAILED`, and return.

Validation
----------
`_validate_steps` rejects non-bool values for `continue_on_error`.
Coerced strings like `"true"` are not accepted so authoring
mistakes surface at validation time rather than silently changing
run semantics.

Tests
-----
`TestContinueOnError` in `tests/test_workflows.py` (8 tests):
- `test_undeclared_failure_halts_run` — default halt behaviour.
- `test_declared_and_fired_continues_run` — flag + fail → continue.
- `test_declared_but_step_succeeded_is_noop` — flag + success → no-op.
- `test_if_branch_routes_around_failure` — end-to-end recovery.
- `test_gate_abort_still_halts_with_continue_on_error` — abort
  always halts.
- `test_validation_rejects_non_bool_continue_on_error` — `"true"`
  rejected at validation.
- `test_validation_accepts_bool_continue_on_error` — `true`/`false`
  pass cleanly.
- `test_engine_ignores_truthy_non_bool_continue_on_error` —
  defense-in-depth: engine ignores string `"true"` even when
  validation is bypassed.

Rebased onto current upstream/main (post #2664 merge); the new
`TestContinueOnError` class sits immediately after upstream's
`TestContextRunId` so the two feature suites coexist cleanly.

Closes #2591.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
doquanghuy
2026-05-28 01:11:01 +07:00
parent c6afe4cde1
commit d0b9e008a7
3 changed files with 396 additions and 31 deletions

View File

@@ -231,6 +231,20 @@ def _validate_steps(
step_errors = step_impl.validate(step_config) step_errors = step_impl.validate(step_config)
errors.extend(step_errors) errors.extend(step_errors)
# Validate optional `continue_on_error` field. The engine honours
# this on any step that returns FAILED so the pipeline can route
# around the failure via downstream `if`/`switch`/`gate`. The
# field must be a literal boolean — coercion from truthy strings
# is deliberately not supported so authoring mistakes surface
# at validation time rather than silently changing run semantics.
if "continue_on_error" in step_config:
coe = step_config["continue_on_error"]
if not isinstance(coe, bool):
errors.append(
f"Step {step_id!r}: 'continue_on_error' must be a "
f"boolean, got {type(coe).__name__}."
)
# Recursively validate nested steps # Recursively validate nested steps
for nested_key in ("then", "else", "steps"): for nested_key in ("then", "else", "steps"):
nested = step_config.get(nested_key) nested = step_config.get(nested_key)
@@ -622,7 +636,10 @@ class WorkflowEngine:
# Handle failures # Handle failures
if result.status == StepStatus.FAILED: if result.status == StepStatus.FAILED:
# Gate abort (output.aborted) maps to ABORTED status # Gate abort (output.aborted) maps to ABORTED status.
# Aborts are deliberate operator decisions, so
# `continue_on_error` does NOT override them — that flag
# is for transient/expected step failures only.
if result.output.get("aborted"): if result.output.get("aborted"):
state.status = RunStatus.ABORTED state.status = RunStatus.ABORTED
state.append_log( state.append_log(
@@ -631,15 +648,48 @@ class WorkflowEngine:
"step_id": step_id, "step_id": step_id,
} }
) )
else: state.save()
state.status = RunStatus.FAILED return
# `continue_on_error: true` lets the pipeline route
# around the failure instead of halting. The step
# result (including exit_code, stderr, status) is
# still recorded so downstream `if`/`switch`/`gate`
# steps can branch on it. Log a single, unambiguous
# event per failure resolution — either the run
# continued past it, or it halted.
#
# Use identity comparison (`is True`) rather than
# truthiness so that only a literal boolean enables
# the behaviour, even if validation was skipped.
# Validation rejects non-bool values at parse time,
# but `WorkflowEngine.execute()` does not auto-validate
# (see `WorkflowEngine.load_workflow`, whose docstring
# explicitly notes "not yet validated; call
# `validate_workflow()` or `engine.validate()`
# separately"), so a caller passing an unvalidated
# definition could otherwise see truthy non-bool
# values like the string `"true"` silently change
# run semantics.
if step_config.get("continue_on_error") is True:
state.append_log( state.append_log(
{ {
"event": "step_failed", "event": "step_continue_on_error",
"step_id": step_id, "step_id": step_id,
"error": result.error, "error": result.error,
} }
) )
state.save()
continue
state.status = RunStatus.FAILED
state.append_log(
{
"event": "step_failed",
"step_id": step_id,
"error": result.error,
}
)
state.save() state.save()
return return

View File

@@ -2333,6 +2333,303 @@ steps:
assert state.step_results["only-step"]["output"]["stdout"].strip() == "hello" assert state.step_results["only-step"]["output"]["stdout"].strip() == "hello"
# ===== continue_on_error Tests =====
#
# Locks the contract documented in workflows/README.md "Error Handling"
# section: when an executable step fails and `continue_on_error: true`
# is declared, the engine records the step's `output` (with `exit_code`
# and `stderr` from the failure) and its `status` (sibling key on
# `steps.<id>`, not nested under `output`) and continues to the next
# sibling step instead of halting the run. Gate aborts
# (`output.aborted`) still halt regardless of the flag.
class TestContinueOnError:
"""Test the `continue_on_error` step-level field."""
def test_undeclared_failure_halts_run(self, project_dir):
"""Default behaviour (no `continue_on_error`): a failing step
halts the workflow run with `status == FAILED`.
Locks the byte-equivalent default — workflows that do not
declare the flag must behave exactly as before this feature.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "halt-on-fail"
name: "Halt On Fail"
version: "1.0.0"
steps:
- id: fail-step
type: shell
run: "exit 7"
- id: after
type: shell
run: "echo should-not-run"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.FAILED
assert "fail-step" in state.step_results
assert state.step_results["fail-step"]["output"]["exit_code"] == 7
# Subsequent step never executes when the flag is absent.
assert "after" not in state.step_results
def test_declared_and_fired_continues_run(self, project_dir):
"""`continue_on_error: true` + failing step: the run keeps
going, the failed step's result is recorded, and the
downstream step runs.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "continue-past-fail"
name: "Continue Past Fail"
version: "1.0.0"
steps:
- id: flaky-step
type: shell
run: "exit 42"
continue_on_error: true
- id: after
type: shell
run: "echo did-run"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
# Failed step's exit_code is preserved so downstream branching
# can inspect it.
assert state.step_results["flaky-step"]["output"]["exit_code"] == 42
assert state.step_results["flaky-step"]["status"] == "failed"
# Downstream step ran successfully.
assert state.step_results["after"]["output"]["exit_code"] == 0
def test_declared_but_step_succeeded_is_noop(self, project_dir):
"""`continue_on_error: true` on a step that succeeds is a
no-op — the flag only changes behaviour on FAILED status.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "flag-but-success"
name: "Flag But Success"
version: "1.0.0"
steps:
- id: ok-step
type: shell
run: "echo ok"
continue_on_error: true
- id: after
type: shell
run: "echo done"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
assert state.step_results["ok-step"]["status"] == "completed"
assert state.step_results["ok-step"]["output"]["exit_code"] == 0
assert state.step_results["after"]["output"]["exit_code"] == 0
def test_if_branch_routes_around_failure(self, project_dir):
"""End-to-end: `continue_on_error` + `if` cleanly routes around
a failure. The recovery branch runs; the success branch does
not.
Mirrors the canonical usage pattern from the original feature
discussion in issue #2591.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "route-around"
name: "Route Around Failure"
version: "1.0.0"
steps:
- id: heavy-thing
type: shell
run: "exit 1"
continue_on_error: true
- id: check-result
type: if
condition: "{{ steps.heavy-thing.output.exit_code != 0 }}"
then:
- id: recovery
type: shell
run: "echo recovery-ran"
else:
- id: happy-path
type: shell
run: "echo happy-path-ran"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
assert "recovery" in state.step_results
assert "happy-path" not in state.step_results
def test_gate_abort_still_halts_with_continue_on_error(
self, project_dir, monkeypatch
):
"""`continue_on_error` does NOT override a deliberate gate
abort. `output.aborted` always halts the run with
`status == ABORTED`.
Aborts are explicit operator decisions; continue_on_error
is for transient/expected step failures only.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.steps import gate as gate_module
# Force the gate step into interactive mode and feed a "reject"
# choice so the abort path actually runs in the test env
# (default behaviour returns PAUSED when stdin is not a TTY).
# Swap sys.stdin itself for a stub: setattr on the real
# TextIOWrapper's `isatty` method is not assignable under some
# runners (e.g. pytest with capture disabled).
class _TTYStdin:
def isatty(self) -> bool:
return True
monkeypatch.setattr(gate_module.sys, "stdin", _TTYStdin())
monkeypatch.setattr(
GateStep, "_prompt", staticmethod(lambda _msg, _opts: "reject")
)
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "gate-abort-halts"
name: "Gate Abort Halts"
version: "1.0.0"
steps:
- id: gate-step
type: gate
message: "Approve?"
options: [approve, reject]
on_reject: abort
continue_on_error: true
- id: should-not-run
type: shell
run: "echo nope"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.ABORTED
assert "should-not-run" not in state.step_results
def test_validation_rejects_non_bool_continue_on_error(self):
"""`continue_on_error` must be a literal boolean; coerced
strings like `"true"` are rejected at validation time so
authoring mistakes surface before execution.
"""
from specify_cli.workflows.engine import (
WorkflowDefinition,
validate_workflow,
)
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "bad-coe"
name: "Bad COE"
version: "1.0.0"
steps:
- id: step-one
type: shell
run: "true"
continue_on_error: "true"
""")
errors = validate_workflow(definition)
assert any(
"continue_on_error" in e and "boolean" in e for e in errors
), errors
def test_validation_accepts_bool_continue_on_error(self):
"""Boolean values pass validation cleanly."""
from specify_cli.workflows.engine import (
WorkflowDefinition,
validate_workflow,
)
for value in (True, False):
yaml_value = "true" if value else "false"
definition = WorkflowDefinition.from_string(f"""
schema_version: "1.0"
workflow:
id: "good-coe"
name: "Good COE"
version: "1.0.0"
steps:
- id: step-one
type: shell
run: "true"
continue_on_error: {yaml_value}
""")
errors = validate_workflow(definition)
assert errors == [], errors
def test_engine_ignores_truthy_non_bool_continue_on_error(self, project_dir):
"""Defense-in-depth: even if a caller bypasses
`validate_workflow()` and feeds the engine a definition with
`continue_on_error: "true"` (a string), the engine must NOT
honour the flag — only a literal boolean enables the
behaviour. `WorkflowEngine.execute()` does not auto-validate
(the `WorkflowEngine.load_workflow` docstring explicitly
notes the definition is "not yet validated; call
`validate_workflow()` or `engine.validate()` separately"),
so the engine guards against truthy non-bool values itself
via an identity check rather than truthiness.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
# Bypass `validate_workflow()` — execute() is what would
# be called by a caller that skipped validation.
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "string-coe"
name: "String COE"
version: "1.0.0"
steps:
- id: fail-step
type: shell
run: "exit 1"
continue_on_error: "true"
- id: should-not-run
type: shell
run: "echo should-not-run"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
# String "true" is truthy but not a literal boolean, so the
# engine must treat the step as a halting failure.
assert state.status == RunStatus.FAILED
assert "should-not-run" not in state.step_results
# ===== State Persistence Tests ===== # ===== State Persistence Tests =====
class TestRunState: class TestRunState:

View File

@@ -219,6 +219,51 @@ Aggregate results from fan-out steps:
output: {} output: {}
``` ```
## Error Handling
By default, any step that ends in `StepStatus.FAILED` at runtime halts
the entire run — most commonly a `shell` or `command` step exiting
non-zero, but also any other runtime error raised during step
execution. (Invalid workflow definitions are rejected up-front by
`specify workflow run` before the run even starts, so structural
validation failures never reach this code path.) Set
`continue_on_error: true` on a step to record its result and continue
to the next sibling step instead. When the failure was a non-zero
exit, the exit code remains available on
`steps.<id>.output.exit_code` so downstream `if`, `switch`, or `gate`
steps can branch on it:
```yaml
- id: heavy-thing
type: command
integration: claude
command: speckit.heavy-thing
continue_on_error: true
- id: check-result
type: if
condition: "{{ steps.heavy-thing.output.exit_code != 0 }}"
then:
- id: review
type: gate
message: "Step failed (exit {{ steps.heavy-thing.output.exit_code }}). Retry or skip?"
on_reject: skip
else:
- id: next-thing
command: speckit.next-thing
```
**Notes:**
- The field must be a literal boolean (`true` / `false`); coerced
strings like `"true"` are rejected at validation time.
- Gate aborts (`on_reject: abort` chosen by the operator) always halt
the run — `continue_on_error` does not override them. The flag is
for transient/expected step failures, not for overriding deliberate
operator decisions.
- When the flag is omitted, behaviour is byte-equivalent to before
this feature.
## Expressions ## Expressions
Workflow definitions use `{{ expression }}` syntax for dynamic values: Workflow definitions use `{{ expression }}` syntax for dynamic values:
@@ -239,33 +284,6 @@ message: "{{ status | default('pending') }}"
Supported filters: `default`, `join`, `contains`, `map`. Supported filters: `default`, `join`, `contains`, `map`.
### Runtime Context
`{{ context.* }}` exposes engine-managed runtime metadata for the
current run:
| Variable | Description |
|----------|-------------|
| `context.run_id` | The current workflow run id (the same value Spec Kit prints as `Run ID:` at the end of `workflow run`). Auto-generated runs are 8-character hex from `uuid4`; operator-supplied ids may be any alphanumeric string with hyphens or underscores. Empty string outside a run context. |
```yaml
# Stamp telemetry events with the run id for cross-system join.
- id: emit-event
type: shell
run: 'echo "{\"run_id\":\"{{ context.run_id }}\",\"event\":\"started\"}" >> events.jsonl'
# Per-run scratch directory.
- id: prep-scratch
type: shell
run: 'mkdir -p /tmp/run-{{ context.run_id }}'
# Pass run id into a command for artifact metadata.
- id: tag-artifact
command: speckit.specify
input:
args: "{{ context.run_id }}"
```
## Input Types ## Input Types
Workflow inputs are type-checked and coerced from CLI string values: Workflow inputs are type-checked and coerced from CLI string values: