Files
github-spec-kit/tests/test_workflows.py
One-TheOnly 8e5643d4ff fix(cursor-agent): enable headless CLI dispatch end-to-end (-p --trust --approve-mcps --force + Windows .cmd shim resolution) (#2631)
* fix(cursor-agent): enable CLI dispatch via ``-p --trust`` headless mode

Restores the ability for ``specify workflow run`` to dispatch the
cursor-agent CLI, complementing the existing in-IDE skill flow.
Without this fix, ``specify workflow run speckit --input
integration=cursor-agent ...`` fails with a misleading
``CLI not found or not installed`` error even when the CLI is
installed (since cursor-agent had ``requires_cli=False`` and an
unset ``build_exec_args``).

The cursor-agent CLI (>= 2026.05.16) supports headless execution
via ``-p`` (print mode with full tool access including write/shell)
and ``--trust`` (bypass Workspace Trust prompt). Without ``--trust``
the CLI exits non-zero in non-TTY contexts (verified locally).

Changes to ``src/specify_cli/integrations/cursor_agent/__init__.py``:

* ``config.requires_cli``: ``False`` -> ``True``
* ``config.install_url``: ``None`` -> Cursor CLI docs URL
* Override ``build_exec_args()`` to emit
  ``[cursor-agent, -p, --trust, <prompt>, ...]``
  with optional ``--model`` and ``--output-format json`` flags,
  mirroring the shape used by ``claude``/``codex``/``gemini``.

Tests:

* 34 existing cursor-agent tests still pass.
* 6 new tests in ``TestCursorAgentCliDispatch`` pin
  ``requires_cli``, ``install_url``, and the exact argv shape
  (default, text-output, with-model, and the hyphenated skill
  invocation form ``/speckit-<name>``).
* Full repo: 1085 / 1085 passed, no regressions.

Fixes #2629

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(integrations): resolve ``.cmd``/``.bat`` shims before subprocess.run

On Windows, ``shutil.which`` honors ``PATHEXT`` and locates wrappers
like ``cursor-agent.cmd`` and ``codex.cmd``, but Python's
``subprocess.run`` calls ``CreateProcess`` which does **not** consult
``PATHEXT`` and therefore fails with ``WinError 2`` on a bare argv
like ``[cursor-agent, ...]``.

Resolve ``exec_args[0]`` via ``shutil.which`` in
``IntegrationBase.dispatch_command`` so ``.cmd``/``.bat`` shims work
transparently. On POSIX this is a no-op for absolute paths and a
harmless lookup otherwise.

Verified locally on Windows 10 + cursor-agent 2026.05.16:
without this fix, ``specify workflow run speckit --input
integration=cursor-agent`` fails with ``FileNotFoundError`` even
after the cursor-agent integration starts producing valid exec
args (per the prior commit on this branch).

Tests:

* New: 2 cursor-agent tests pin the shim-resolution + passthrough
  behavior (``test_dispatch_command_resolves_cmd_shim_for_subprocess``
  and ``test_dispatch_command_passthrough_when_shutil_which_finds_nothing``).
* Updated: ``tests/test_workflows.py::TestCommandStep::test_dispatch_with_mock_cli``
  was mocking ``shutil.which`` only at the ``command`` step level
  and not at the ``base`` level, which made it environment-sensitive
  (fails locally when the real ``claude`` CLI is on PATH).  Added the
  matching base-level patch and updated the argv-assertion to reflect
  the resolved path. ``test_dispatch_failure_returns_failed_status``
  gets the same patch for consistency.
* Full repo: 2867 passed, 0 regression from this PR. The 12 remaining
  pre-existing failures are unrelated Windows ``symlink`` privilege
  failures (``WinError 1314``) on a non-admin Windows runner.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(cursor-agent): inject --approve-mcps --force for headless MCP/tool access

The previous commit (1c55988) wired up ``-p --trust`` so the CLI launches
in headless mode without the Workspace Trust prompt, but that alone is
not enough to let ``specify workflow run`` drive a real speckit feature
end-to-end with cursor-agent on Windows. Two more flags are required:

* ``--approve-mcps``: without it, every MCP server configured in
  ``.cursor/mcp.json`` stays ``not loaded (needs approval)``, and any
  tool call against them is silently dropped. We hit this immediately
  trying to read a DingTalk PRD from a remote MCP server during the
  ``/speckit-specify`` step.
* ``--force``: without it, the agent halts on the first tool-call
  approval prompt (the tool call gets rejected and the workflow exits
  non-zero with a misleading message). With ``--force`` cursor-agent
  matches the implicit "trusted environment" semantics that ``claude -p``
  and ``codex --exec`` already have by default -- which is the right
  semantics for an unattended ``specify workflow run`` invocation.

Verified end-to-end on Windows 10 + cursor-agent 2026.05.16-0338208:

* ``cursor-agent -p --trust --approve-mcps --force --output-format text``
  + a ``/speckit-specify`` prompt that included a DingTalk URL produced
  a full spec.md (31.5 KB) plus checklists/requirements.md in ~10.7 min,
  reading the source PRD through the ``dingtalk-doc`` remote MCP server,
  deciding the ``specs/`` subpath itself, and updating
  ``.specify/feature.json`` and ``specs/menu-dictionary.md`` along the
  way -- no human-in-the-loop, no source PRD ever touched the filesystem.
* Without ``--approve-mcps`` the same prompt errors with the tool call
  rejected message; without ``--force`` the agent stops at the first
  non-MCP tool call.

Tests:

* ``test_build_exec_args_*`` updated to pin the new four-flag prefix.
* New ``test_build_exec_args_contains_mandatory_headless_flags`` asserts
  the four flags are always present together.
* ``test_dispatch_command_resolves_cmd_shim_for_subprocess`` updated to
  match the new argv layout.
* All 43 cursor-agent tests pass; no other tests touched.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(cursor-agent): express dispatch support via build_exec_args() instead of requires_cli

Co-authored-by: Cursor <cursoragent@cursor.com>

* test(cursor-agent): use urlparse hostname check and cover dispatch without requires_cli

Co-authored-by: Cursor <cursoragent@cursor.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: 刘一 <liuyi@oureman.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-06-04 09:48:33 -05:00

3254 lines
111 KiB
Python

"""Tests for the workflow engine subsystem.
Covers:
- Step registry & auto-discovery
- Base classes (StepBase, StepContext, StepResult)
- Expression engine
- All 10 built-in step types
- Workflow definition loading & validation
- Workflow engine execution & state persistence
- Workflow catalog & registry
"""
from __future__ import annotations
import json
import os
import shutil
import tempfile
from pathlib import Path
import pytest
import yaml
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def temp_dir():
"""Create a temporary directory for tests."""
tmpdir = tempfile.mkdtemp()
yield Path(tmpdir)
shutil.rmtree(tmpdir)
@pytest.fixture
def project_dir(temp_dir):
"""Create a mock spec-kit project with .specify/ directory."""
specify_dir = temp_dir / ".specify"
specify_dir.mkdir()
(specify_dir / "workflows").mkdir()
return temp_dir
@pytest.fixture
def sample_workflow_yaml():
"""Return a valid minimal workflow YAML string."""
return """
schema_version: "1.0"
workflow:
id: "test-workflow"
name: "Test Workflow"
version: "1.0.0"
description: "A test workflow"
inputs:
spec:
type: string
required: true
scope:
type: string
default: "full"
steps:
- id: step-one
command: speckit.specify
input:
args: "{{ inputs.spec }}"
- id: step-two
command: speckit.plan
input:
args: "{{ steps.step-one.output.command }}"
"""
@pytest.fixture
def sample_workflow_file(project_dir, sample_workflow_yaml):
"""Write a sample workflow YAML to a file and return its path."""
wf_dir = project_dir / ".specify" / "workflows" / "test-workflow"
wf_dir.mkdir(parents=True, exist_ok=True)
wf_path = wf_dir / "workflow.yml"
wf_path.write_text(sample_workflow_yaml, encoding="utf-8")
return wf_path
# ===== Step Registry Tests =====
class TestStepRegistry:
"""Test STEP_REGISTRY and auto-discovery."""
def test_registry_populated(self):
from specify_cli.workflows import STEP_REGISTRY
assert len(STEP_REGISTRY) >= 10
def test_all_step_types_registered(self):
from specify_cli.workflows import STEP_REGISTRY
expected = {
"command", "shell", "prompt", "gate", "if", "switch",
"while", "do-while", "fan-out", "fan-in",
}
assert expected.issubset(set(STEP_REGISTRY.keys()))
def test_get_step_type(self):
from specify_cli.workflows import get_step_type
step = get_step_type("command")
assert step is not None
assert step.type_key == "command"
def test_get_step_type_missing(self):
from specify_cli.workflows import get_step_type
assert get_step_type("nonexistent") is None
def test_register_step_duplicate_raises(self):
from specify_cli.workflows import _register_step
from specify_cli.workflows.steps.command import CommandStep
with pytest.raises(KeyError, match="already registered"):
_register_step(CommandStep())
def test_register_step_empty_key_raises(self):
from specify_cli.workflows import _register_step
from specify_cli.workflows.base import StepBase, StepResult
class EmptyStep(StepBase):
type_key = ""
def execute(self, config, context):
return StepResult()
with pytest.raises(ValueError, match="empty type_key"):
_register_step(EmptyStep())
# ===== Base Classes Tests =====
class TestBaseClasses:
"""Test StepBase, StepContext, StepResult."""
def test_step_context_defaults(self):
from specify_cli.workflows.base import StepContext
ctx = StepContext()
assert ctx.inputs == {}
assert ctx.steps == {}
assert ctx.item is None
assert ctx.fan_in == {}
assert ctx.default_integration is None
def test_step_context_with_data(self):
from specify_cli.workflows.base import StepContext
ctx = StepContext(
inputs={"name": "test"},
default_integration="claude",
default_model="sonnet-4",
)
assert ctx.inputs == {"name": "test"}
assert ctx.default_integration == "claude"
assert ctx.default_model == "sonnet-4"
def test_step_result_defaults(self):
from specify_cli.workflows.base import StepResult, StepStatus
result = StepResult()
assert result.status == StepStatus.COMPLETED
assert result.output == {}
assert result.next_steps == []
assert result.error is None
def test_step_status_values(self):
from specify_cli.workflows.base import StepStatus
assert StepStatus.PENDING == "pending"
assert StepStatus.RUNNING == "running"
assert StepStatus.COMPLETED == "completed"
assert StepStatus.FAILED == "failed"
assert StepStatus.SKIPPED == "skipped"
assert StepStatus.PAUSED == "paused"
def test_run_status_values(self):
from specify_cli.workflows.base import RunStatus
assert RunStatus.CREATED == "created"
assert RunStatus.RUNNING == "running"
assert RunStatus.PAUSED == "paused"
assert RunStatus.COMPLETED == "completed"
assert RunStatus.FAILED == "failed"
assert RunStatus.ABORTED == "aborted"
# ===== Expression Engine Tests =====
class TestExpressions:
"""Test sandboxed expression evaluator."""
def test_simple_variable(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(inputs={"name": "login"})
assert evaluate_expression("{{ inputs.name }}", ctx) == "login"
def test_step_output_reference(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(
steps={"specify": {"output": {"file": "spec.md"}}}
)
assert evaluate_expression("{{ steps.specify.output.file }}", ctx) == "spec.md"
def test_string_interpolation(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(inputs={"name": "login"})
result = evaluate_expression("Feature: {{ inputs.name }} done", ctx)
assert result == "Feature: login done"
def test_comparison_equals(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(inputs={"scope": "full"})
assert evaluate_expression("{{ inputs.scope == 'full' }}", ctx) is True
assert evaluate_expression("{{ inputs.scope == 'partial' }}", ctx) is False
def test_comparison_not_equals(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(
steps={"run-tests": {"output": {"exit_code": 1}}}
)
result = evaluate_expression("{{ steps.run-tests.output.exit_code != 0 }}", ctx)
assert result is True
def test_numeric_comparison(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(
steps={"plan": {"output": {"task_count": 7}}}
)
assert evaluate_expression("{{ steps.plan.output.task_count > 5 }}", ctx) is True
assert evaluate_expression("{{ steps.plan.output.task_count < 5 }}", ctx) is False
def test_boolean_and(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(inputs={"a": True, "b": True})
assert evaluate_expression("{{ inputs.a and inputs.b }}", ctx) is True
def test_boolean_or(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(inputs={"a": False, "b": True})
assert evaluate_expression("{{ inputs.a or inputs.b }}", ctx) is True
def test_filter_default(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext()
assert evaluate_expression("{{ inputs.missing | default('fallback') }}", ctx) == "fallback"
def test_filter_join(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(inputs={"tags": ["a", "b", "c"]})
assert evaluate_expression("{{ inputs.tags | join(', ') }}", ctx) == "a, b, c"
def test_filter_contains(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(inputs={"text": "hello world"})
assert evaluate_expression("{{ inputs.text | contains('world') }}", ctx) is True
def test_condition_evaluation(self):
from specify_cli.workflows.expressions import evaluate_condition
from specify_cli.workflows.base import StepContext
ctx = StepContext(inputs={"ready": True})
assert evaluate_condition("{{ inputs.ready }}", ctx) is True
assert evaluate_condition("{{ inputs.missing }}", ctx) is False
def test_non_string_passthrough(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext()
assert evaluate_expression(42, ctx) == 42
assert evaluate_expression(None, ctx) is None
def test_string_literal(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext()
assert evaluate_expression("{{ 'hello' }}", ctx) == "hello"
def test_numeric_literal(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext()
assert evaluate_expression("{{ 42 }}", ctx) == 42
def test_boolean_literal(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext()
assert evaluate_expression("{{ true }}", ctx) is True
assert evaluate_expression("{{ false }}", ctx) is False
def test_list_indexing(self):
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(
steps={"tasks": {"output": {"task_list": [{"file": "a.md"}, {"file": "b.md"}]}}}
)
result = evaluate_expression("{{ steps.tasks.output.task_list[0].file }}", ctx)
assert result == "a.md"
def test_context_run_id_resolves(self):
"""``{{ context.run_id }}`` resolves to ``StepContext.run_id``.
Locks the contract from issue #2590: workflow templates can
reference the engine-assigned run id for telemetry, artifact
metadata, or per-run scratch isolation.
"""
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(run_id="a1b2c3d4")
assert evaluate_expression("{{ context.run_id }}", ctx) == "a1b2c3d4"
def test_context_run_id_defaults_to_empty_when_unset(self):
"""``{{ context.run_id }}`` resolves to ``""`` when no run is
active (dry-run, validation, ad-hoc evaluator usage) rather
than raising — workflows referencing the variable never error
outside a run context.
"""
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
# No run_id set on the context.
ctx = StepContext()
assert evaluate_expression("{{ context.run_id }}", ctx) == ""
def test_context_run_id_string_interpolation(self):
"""Run id interpolates inside a larger template string — the
common pattern for stamping shell commands and artifact paths
with the run id.
"""
from specify_cli.workflows.expressions import evaluate_expression
from specify_cli.workflows.base import StepContext
ctx = StepContext(run_id="deadbeef")
result = evaluate_expression("RUN_ID={{ context.run_id }}", ctx)
assert result == "RUN_ID=deadbeef"
# ===== Integration Dispatch Tests =====
class TestBuildExecArgs:
"""Test build_exec_args for CLI-based integrations."""
def test_claude_exec_args(self):
from specify_cli.integrations.claude import ClaudeIntegration
impl = ClaudeIntegration()
args = impl.build_exec_args("do stuff", model="sonnet-4")
assert args[0] == "claude"
assert args[1] == "-p"
assert args[2] == "do stuff"
assert "--model" in args
assert "sonnet-4" in args
assert "--output-format" in args
def test_gemini_exec_args(self):
from specify_cli.integrations.gemini import GeminiIntegration
impl = GeminiIntegration()
args = impl.build_exec_args("do stuff", model="gemini-2.5-pro")
assert args[0] == "gemini"
assert args[1] == "-p"
assert "-m" in args
assert "gemini-2.5-pro" in args
def test_codex_exec_args(self):
from specify_cli.integrations.codex import CodexIntegration
impl = CodexIntegration()
args = impl.build_exec_args("do stuff")
assert args[0] == "codex"
assert args[1] == "exec"
assert args[2] == "do stuff"
assert "--json" in args
def test_copilot_exec_args(self, monkeypatch):
monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False)
monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False)
from specify_cli.integrations.copilot import CopilotIntegration
impl = CopilotIntegration()
args = impl.build_exec_args("do stuff", model="claude-sonnet-4-20250514")
expected_exec = "copilot.cmd" if os.name == "nt" else "copilot"
assert args[0] == expected_exec
assert "-p" in args
assert "--yolo" in args
assert "--model" in args
def test_copilot_new_env_var_disables_yolo(self, monkeypatch):
monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "0")
monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False)
from specify_cli.integrations.copilot import CopilotIntegration
impl = CopilotIntegration()
args = impl.build_exec_args("do stuff")
assert "--yolo" not in args
def test_copilot_deprecated_env_var_still_honoured(self, monkeypatch):
monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False)
monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0")
import warnings
from specify_cli.integrations.copilot import CopilotIntegration
impl = CopilotIntegration()
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
args = impl.build_exec_args("do stuff")
assert "--yolo" not in args
assert any(
"SPECKIT_ALLOW_ALL_TOOLS is deprecated" in str(x.message)
and issubclass(x.category, UserWarning)
for x in w
)
def test_copilot_new_env_var_takes_precedence(self, monkeypatch):
monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "1")
monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0")
from specify_cli.integrations.copilot import CopilotIntegration
impl = CopilotIntegration()
args = impl.build_exec_args("do stuff")
assert "--yolo" in args
def test_ide_only_returns_none(self):
from specify_cli.integrations.windsurf import WindsurfIntegration
impl = WindsurfIntegration()
assert impl.build_exec_args("test") is None
def test_no_model_omits_flag(self):
from specify_cli.integrations.claude import ClaudeIntegration
impl = ClaudeIntegration()
args = impl.build_exec_args("do stuff", model=None)
assert "--model" not in args
def test_no_json_omits_flag(self):
from specify_cli.integrations.claude import ClaudeIntegration
impl = ClaudeIntegration()
args = impl.build_exec_args("do stuff", output_json=False)
assert "--output-format" not in args
# ===== Step Type Tests =====
class TestCommandStep:
"""Test the command step type."""
def test_execute_basic(self):
from unittest.mock import patch
from specify_cli.workflows.steps.command import CommandStep
from specify_cli.workflows.base import StepContext, StepStatus
step = CommandStep()
ctx = StepContext(
inputs={"name": "login"},
default_integration="claude",
)
config = {
"id": "test",
"command": "speckit.specify",
"input": {"args": "{{ inputs.name }}"},
}
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
result = step.execute(config, ctx)
assert result.status == StepStatus.FAILED
assert result.output["command"] == "speckit.specify"
assert result.output["integration"] == "claude"
assert result.output["input"]["args"] == "login"
def test_validate_missing_command(self):
from specify_cli.workflows.steps.command import CommandStep
step = CommandStep()
errors = step.validate({"id": "test"})
assert any("missing 'command'" in e for e in errors)
def test_step_override_integration(self):
from unittest.mock import patch
from specify_cli.workflows.steps.command import CommandStep
from specify_cli.workflows.base import StepContext
step = CommandStep()
ctx = StepContext(default_integration="claude")
config = {
"id": "test",
"command": "speckit.plan",
"integration": "gemini",
"input": {},
}
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
result = step.execute(config, ctx)
assert result.output["integration"] == "gemini"
def test_step_override_model(self):
from unittest.mock import patch
from specify_cli.workflows.steps.command import CommandStep
from specify_cli.workflows.base import StepContext
step = CommandStep()
ctx = StepContext(default_model="sonnet-4")
config = {
"id": "test",
"command": "speckit.implement",
"model": "opus-4",
"input": {},
}
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
result = step.execute(config, ctx)
assert result.output["model"] == "opus-4"
def test_options_merge(self):
from unittest.mock import patch
from specify_cli.workflows.steps.command import CommandStep
from specify_cli.workflows.base import StepContext
step = CommandStep()
ctx = StepContext(default_options={"max-tokens": 8000})
config = {
"id": "test",
"command": "speckit.plan",
"options": {"thinking-budget": 32768},
"input": {},
}
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
result = step.execute(config, ctx)
assert result.output["options"]["max-tokens"] == 8000
assert result.output["options"]["thinking-budget"] == 32768
def test_dispatch_not_attempted_without_cli(self):
"""When the CLI tool is not installed, step should fail."""
from unittest.mock import patch
from specify_cli.workflows.steps.command import CommandStep
from specify_cli.workflows.base import StepContext, StepStatus
step = CommandStep()
ctx = StepContext(
inputs={"name": "login"},
default_integration="claude",
project_root="/tmp",
)
config = {
"id": "test",
"command": "speckit.specify",
"input": {"args": "{{ inputs.name }}"},
}
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
result = step.execute(config, ctx)
assert result.status == StepStatus.FAILED
assert result.output["dispatched"] is False
assert result.error is not None
def test_dispatch_with_mock_cli(self, tmp_path, monkeypatch):
"""When the CLI is installed, dispatch invokes the command by name."""
from unittest.mock import patch, MagicMock
from specify_cli.workflows.steps.command import CommandStep
from specify_cli.workflows.base import StepContext, StepStatus
step = CommandStep()
ctx = StepContext(
inputs={"name": "login"},
default_integration="claude",
project_root=str(tmp_path),
)
config = {
"id": "test",
"command": "speckit.specify",
"input": {"args": "{{ inputs.name }}"},
}
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = '{"result": "done"}'
mock_result.stderr = ""
with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \
patch("specify_cli.integrations.base.shutil.which", return_value="/usr/local/bin/claude"), \
patch("subprocess.run", return_value=mock_result) as mock_run:
result = step.execute(config, ctx)
assert result.status == StepStatus.COMPLETED
assert result.output["dispatched"] is True
assert result.output["exit_code"] == 0
# Verify the CLI was called with the resolved path (via shutil.which,
# which honors PATHEXT for ``.cmd``/``.bat`` shims on Windows), then
# ``-p`` and the skill invocation.
call_args = mock_run.call_args
assert call_args[0][0][0] == "/usr/local/bin/claude"
assert call_args[0][0][1] == "-p"
# Claude is a SkillsIntegration so uses /speckit-specify
assert "/speckit-specify login" in call_args[0][0][2]
def test_dispatch_failure_returns_failed_status(self, tmp_path):
"""When the CLI exits non-zero, the step should fail."""
from unittest.mock import patch, MagicMock
from specify_cli.workflows.steps.command import CommandStep
from specify_cli.workflows.base import StepContext, StepStatus
step = CommandStep()
ctx = StepContext(
inputs={},
default_integration="claude",
project_root=str(tmp_path),
)
config = {
"id": "test",
"command": "speckit.specify",
"input": {"args": "test"},
}
mock_result = MagicMock()
mock_result.returncode = 1
mock_result.stdout = ""
mock_result.stderr = "API error"
with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \
patch("specify_cli.integrations.base.shutil.which", return_value="/usr/local/bin/claude"), \
patch("subprocess.run", return_value=mock_result):
result = step.execute(config, ctx)
assert result.status == StepStatus.FAILED
assert result.output["dispatched"] is True
assert result.output["exit_code"] == 1
class TestPromptStep:
"""Test the prompt step type."""
def test_execute_basic(self):
from unittest.mock import patch
from specify_cli.workflows.steps.prompt import PromptStep
from specify_cli.workflows.base import StepContext, StepStatus
step = PromptStep()
ctx = StepContext(
inputs={"file": "auth.py"},
default_integration="claude",
)
config = {
"id": "review",
"type": "prompt",
"prompt": "Review {{ inputs.file }} for security issues",
}
with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None):
result = step.execute(config, ctx)
assert result.status == StepStatus.FAILED
assert result.output["prompt"] == "Review auth.py for security issues"
assert result.output["integration"] == "claude"
assert result.output["dispatched"] is False
def test_execute_with_step_integration(self):
from unittest.mock import patch
from specify_cli.workflows.steps.prompt import PromptStep
from specify_cli.workflows.base import StepContext
step = PromptStep()
ctx = StepContext(default_integration="claude")
config = {
"id": "review",
"type": "prompt",
"prompt": "Summarize the codebase",
"integration": "gemini",
}
with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None):
result = step.execute(config, ctx)
assert result.output["integration"] == "gemini"
def test_execute_with_model(self):
from unittest.mock import patch
from specify_cli.workflows.steps.prompt import PromptStep
from specify_cli.workflows.base import StepContext
step = PromptStep()
ctx = StepContext(default_integration="claude", default_model="sonnet-4")
config = {
"id": "review",
"type": "prompt",
"prompt": "hello",
"model": "opus-4",
}
with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None):
result = step.execute(config, ctx)
assert result.output["model"] == "opus-4"
def test_dispatch_with_mock_cli(self, tmp_path):
from unittest.mock import patch, MagicMock
from specify_cli.workflows.steps.prompt import PromptStep
from specify_cli.workflows.base import StepContext, StepStatus
step = PromptStep()
ctx = StepContext(
default_integration="claude",
project_root=str(tmp_path),
)
config = {
"id": "ask",
"type": "prompt",
"prompt": "Explain this code",
}
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = "Here is the explanation"
mock_result.stderr = ""
with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value="/usr/local/bin/claude"), \
patch("subprocess.run", return_value=mock_result):
result = step.execute(config, ctx)
assert result.status == StepStatus.COMPLETED
assert result.output["dispatched"] is True
assert result.output["exit_code"] == 0
def test_validate_missing_prompt(self):
from specify_cli.workflows.steps.prompt import PromptStep
step = PromptStep()
errors = step.validate({"id": "test"})
assert any("missing 'prompt'" in e for e in errors)
def test_validate_valid(self):
from specify_cli.workflows.steps.prompt import PromptStep
step = PromptStep()
errors = step.validate({"id": "test", "prompt": "do something"})
assert errors == []
class TestShellStep:
"""Test the shell step type."""
def test_execute_echo(self):
from specify_cli.workflows.steps.shell import ShellStep
from specify_cli.workflows.base import StepContext, StepStatus
step = ShellStep()
ctx = StepContext()
config = {"id": "test", "run": "echo hello"}
result = step.execute(config, ctx)
assert result.status == StepStatus.COMPLETED
assert result.output["exit_code"] == 0
assert "hello" in result.output["stdout"]
def test_execute_failure(self):
from specify_cli.workflows.steps.shell import ShellStep
from specify_cli.workflows.base import StepContext, StepStatus
step = ShellStep()
ctx = StepContext()
config = {"id": "test", "run": "exit 1"}
result = step.execute(config, ctx)
assert result.status == StepStatus.FAILED
assert result.output["exit_code"] == 1
assert result.error is not None
def test_validate_missing_run(self):
from specify_cli.workflows.steps.shell import ShellStep
step = ShellStep()
errors = step.validate({"id": "test"})
assert any("missing 'run'" in e for e in errors)
class TestGateStep:
"""Test the gate step type."""
def test_execute_returns_paused(self):
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.base import StepContext, StepStatus
step = GateStep()
ctx = StepContext()
config = {
"id": "review",
"message": "Review the spec.",
"options": ["approve", "reject"],
"on_reject": "abort",
}
result = step.execute(config, ctx)
assert result.status == StepStatus.PAUSED
assert result.output["message"] == "Review the spec."
assert result.output["options"] == ["approve", "reject"]
def test_validate_missing_message(self):
from specify_cli.workflows.steps.gate import GateStep
step = GateStep()
errors = step.validate({"id": "test", "options": ["approve"]})
assert any("missing 'message'" in e for e in errors)
def test_validate_invalid_on_reject(self):
from specify_cli.workflows.steps.gate import GateStep
step = GateStep()
errors = step.validate({
"id": "test",
"message": "Review",
"on_reject": "invalid",
})
assert any("on_reject" in e for e in errors)
class TestIfThenStep:
"""Test the if/then/else step type."""
def test_execute_then_branch(self):
from specify_cli.workflows.steps.if_then import IfThenStep
from specify_cli.workflows.base import StepContext
step = IfThenStep()
ctx = StepContext(inputs={"scope": "full"})
config = {
"id": "check",
"condition": "{{ inputs.scope == 'full' }}",
"then": [{"id": "a", "command": "speckit.tasks"}],
"else": [{"id": "b", "command": "speckit.plan"}],
}
result = step.execute(config, ctx)
assert result.output["condition_result"] is True
assert len(result.next_steps) == 1
assert result.next_steps[0]["id"] == "a"
def test_execute_else_branch(self):
from specify_cli.workflows.steps.if_then import IfThenStep
from specify_cli.workflows.base import StepContext
step = IfThenStep()
ctx = StepContext(inputs={"scope": "backend"})
config = {
"id": "check",
"condition": "{{ inputs.scope == 'full' }}",
"then": [{"id": "a", "command": "speckit.tasks"}],
"else": [{"id": "b", "command": "speckit.plan"}],
}
result = step.execute(config, ctx)
assert result.output["condition_result"] is False
assert result.next_steps[0]["id"] == "b"
def test_validate_missing_condition(self):
from specify_cli.workflows.steps.if_then import IfThenStep
step = IfThenStep()
errors = step.validate({"id": "test", "then": []})
assert any("missing 'condition'" in e for e in errors)
class TestSwitchStep:
"""Test the switch step type."""
def test_execute_matches_case(self):
from specify_cli.workflows.steps.switch import SwitchStep
from specify_cli.workflows.base import StepContext
step = SwitchStep()
ctx = StepContext(
steps={"review": {"output": {"choice": "approve"}}}
)
config = {
"id": "route",
"expression": "{{ steps.review.output.choice }}",
"cases": {
"approve": [{"id": "plan", "command": "speckit.plan"}],
"reject": [{"id": "log", "type": "shell", "run": "echo rejected"}],
},
"default": [{"id": "abort", "type": "gate", "message": "Unknown"}],
}
result = step.execute(config, ctx)
assert result.output["matched_case"] == "approve"
assert result.next_steps[0]["id"] == "plan"
def test_execute_falls_to_default(self):
from specify_cli.workflows.steps.switch import SwitchStep
from specify_cli.workflows.base import StepContext
step = SwitchStep()
ctx = StepContext(
steps={"review": {"output": {"choice": "unknown"}}}
)
config = {
"id": "route",
"expression": "{{ steps.review.output.choice }}",
"cases": {
"approve": [{"id": "plan", "command": "speckit.plan"}],
},
"default": [{"id": "fallback", "type": "gate", "message": "Fallback"}],
}
result = step.execute(config, ctx)
assert result.output["matched_case"] == "__default__"
assert result.next_steps[0]["id"] == "fallback"
def test_execute_no_default_no_match(self):
from specify_cli.workflows.steps.switch import SwitchStep
from specify_cli.workflows.base import StepContext
step = SwitchStep()
ctx = StepContext(
steps={"review": {"output": {"choice": "other"}}}
)
config = {
"id": "route",
"expression": "{{ steps.review.output.choice }}",
"cases": {
"approve": [{"id": "plan", "command": "speckit.plan"}],
},
}
result = step.execute(config, ctx)
assert result.output["matched_case"] == "__default__"
assert result.next_steps == []
def test_validate_missing_expression(self):
from specify_cli.workflows.steps.switch import SwitchStep
step = SwitchStep()
errors = step.validate({"id": "test", "cases": {}})
assert any("missing 'expression'" in e for e in errors)
def test_validate_invalid_cases_and_default(self):
from specify_cli.workflows.steps.switch import SwitchStep
step = SwitchStep()
errors = step.validate({
"id": "test",
"expression": "{{ x }}",
"cases": {"a": "not-a-list"},
"default": "also-bad",
})
assert any("case 'a' must be a list" in e for e in errors)
assert any("'default' must be a list" in e for e in errors)
class TestWhileStep:
"""Test the while loop step type."""
def test_execute_condition_true(self):
from specify_cli.workflows.steps.while_loop import WhileStep
from specify_cli.workflows.base import StepContext
step = WhileStep()
ctx = StepContext(
steps={"run-tests": {"output": {"exit_code": 1}}}
)
config = {
"id": "retry",
"condition": "{{ steps.run-tests.output.exit_code != 0 }}",
"max_iterations": 5,
"steps": [{"id": "fix", "command": "speckit.implement"}],
}
result = step.execute(config, ctx)
assert result.output["condition_result"] is True
assert len(result.next_steps) == 1
def test_execute_condition_false(self):
from specify_cli.workflows.steps.while_loop import WhileStep
from specify_cli.workflows.base import StepContext
step = WhileStep()
ctx = StepContext(
steps={"run-tests": {"output": {"exit_code": 0}}}
)
config = {
"id": "retry",
"condition": "{{ steps.run-tests.output.exit_code != 0 }}",
"max_iterations": 5,
"steps": [{"id": "fix", "command": "speckit.implement"}],
}
result = step.execute(config, ctx)
assert result.output["condition_result"] is False
assert result.next_steps == []
def test_validate_missing_fields(self):
from specify_cli.workflows.steps.while_loop import WhileStep
step = WhileStep()
errors = step.validate({"id": "test", "steps": []})
assert any("missing 'condition'" in e for e in errors)
# max_iterations is optional (defaults to 10)
def test_validate_invalid_max_iterations(self):
from specify_cli.workflows.steps.while_loop import WhileStep
step = WhileStep()
errors = step.validate({"id": "test", "condition": "{{ true }}", "max_iterations": 0, "steps": []})
assert any("must be an integer >= 1" in e for e in errors)
class TestDoWhileStep:
"""Test the do-while loop step type."""
def test_execute_always_runs_once(self):
from specify_cli.workflows.steps.do_while import DoWhileStep
from specify_cli.workflows.base import StepContext
step = DoWhileStep()
ctx = StepContext()
config = {
"id": "cycle",
"condition": "{{ false }}",
"max_iterations": 3,
"steps": [{"id": "refine", "command": "speckit.specify"}],
}
result = step.execute(config, ctx)
assert len(result.next_steps) == 1
assert result.output["loop_type"] == "do-while"
assert result.output["condition"] == "{{ false }}"
def test_execute_with_true_condition(self):
from specify_cli.workflows.steps.do_while import DoWhileStep
from specify_cli.workflows.base import StepContext
step = DoWhileStep()
ctx = StepContext()
config = {
"id": "cycle",
"condition": "{{ true }}",
"max_iterations": 5,
"steps": [{"id": "work", "command": "speckit.plan"}],
}
result = step.execute(config, ctx)
# Body always executes on first call regardless of condition
assert len(result.next_steps) == 1
assert result.output["max_iterations"] == 5
def test_execute_empty_steps(self):
from specify_cli.workflows.steps.do_while import DoWhileStep
from specify_cli.workflows.base import StepContext
step = DoWhileStep()
ctx = StepContext()
config = {
"id": "empty",
"condition": "{{ false }}",
"max_iterations": 1,
"steps": [],
}
result = step.execute(config, ctx)
assert result.next_steps == []
assert result.status.value == "completed"
def test_validate_missing_fields(self):
from specify_cli.workflows.steps.do_while import DoWhileStep
step = DoWhileStep()
errors = step.validate({"id": "test", "steps": []})
assert any("missing 'condition'" in e for e in errors)
# max_iterations is optional (defaults to 10)
def test_validate_steps_not_list(self):
from specify_cli.workflows.steps.do_while import DoWhileStep
step = DoWhileStep()
errors = step.validate({
"id": "test",
"condition": "{{ true }}",
"max_iterations": 3,
"steps": "not-a-list",
})
assert any("'steps' must be a list" in e for e in errors)
class TestFanOutStep:
"""Test the fan-out step type."""
def test_execute_with_items(self):
from specify_cli.workflows.steps.fan_out import FanOutStep
from specify_cli.workflows.base import StepContext
step = FanOutStep()
ctx = StepContext(
steps={"tasks": {"output": {"task_list": [
{"file": "a.md"},
{"file": "b.md"},
]}}}
)
config = {
"id": "parallel",
"items": "{{ steps.tasks.output.task_list }}",
"max_concurrency": 3,
"step": {"id": "impl", "command": "speckit.implement"},
}
result = step.execute(config, ctx)
assert result.output["item_count"] == 2
assert result.output["max_concurrency"] == 3
def test_execute_non_list_items_resolves_empty(self):
from specify_cli.workflows.steps.fan_out import FanOutStep
from specify_cli.workflows.base import StepContext
step = FanOutStep()
ctx = StepContext()
config = {
"id": "parallel",
"items": "{{ undefined_var }}",
"step": {"id": "impl", "command": "speckit.implement"},
}
result = step.execute(config, ctx)
assert result.output["item_count"] == 0
assert result.output["items"] == []
def test_validate_missing_fields(self):
from specify_cli.workflows.steps.fan_out import FanOutStep
step = FanOutStep()
errors = step.validate({"id": "test"})
assert any("missing 'items'" in e for e in errors)
assert any("missing 'step'" in e for e in errors)
def test_validate_step_not_mapping(self):
from specify_cli.workflows.steps.fan_out import FanOutStep
step = FanOutStep()
errors = step.validate({
"id": "test",
"items": "{{ x }}",
"step": "not-a-dict",
})
assert any("'step' must be a mapping" in e for e in errors)
class TestFanInStep:
"""Test the fan-in step type."""
def test_execute_collects_results(self):
from specify_cli.workflows.steps.fan_in import FanInStep
from specify_cli.workflows.base import StepContext
step = FanInStep()
ctx = StepContext(
steps={
"parallel": {"output": {"item_count": 2, "status": "done"}}
}
)
config = {
"id": "collect",
"wait_for": ["parallel"],
"output": {},
}
result = step.execute(config, ctx)
assert len(result.output["results"]) == 1
assert result.output["results"][0]["item_count"] == 2
def test_execute_multiple_wait_for(self):
from specify_cli.workflows.steps.fan_in import FanInStep
from specify_cli.workflows.base import StepContext
step = FanInStep()
ctx = StepContext(
steps={
"task-a": {"output": {"file": "a.md"}},
"task-b": {"output": {"file": "b.md"}},
}
)
config = {
"id": "collect",
"wait_for": ["task-a", "task-b"],
"output": {},
}
result = step.execute(config, ctx)
assert len(result.output["results"]) == 2
assert result.output["results"][0]["file"] == "a.md"
assert result.output["results"][1]["file"] == "b.md"
def test_execute_missing_wait_for_step(self):
from specify_cli.workflows.steps.fan_in import FanInStep
from specify_cli.workflows.base import StepContext
step = FanInStep()
ctx = StepContext(steps={})
config = {
"id": "collect",
"wait_for": ["nonexistent"],
"output": {},
}
result = step.execute(config, ctx)
assert result.output["results"] == [{}]
def test_validate_empty_wait_for(self):
from specify_cli.workflows.steps.fan_in import FanInStep
step = FanInStep()
errors = step.validate({"id": "test", "wait_for": []})
assert any("non-empty list" in e for e in errors)
def test_validate_wait_for_not_list(self):
from specify_cli.workflows.steps.fan_in import FanInStep
step = FanInStep()
errors = step.validate({"id": "test", "wait_for": "not-a-list"})
assert any("non-empty list" in e for e in errors)
# ===== Workflow Definition Tests =====
class TestWorkflowDefinition:
"""Test WorkflowDefinition loading and parsing."""
def test_from_yaml(self, sample_workflow_file):
from specify_cli.workflows.engine import WorkflowDefinition
definition = WorkflowDefinition.from_yaml(sample_workflow_file)
assert definition.id == "test-workflow"
assert definition.name == "Test Workflow"
assert definition.version == "1.0.0"
assert len(definition.steps) == 2
def test_from_string(self, sample_workflow_yaml):
from specify_cli.workflows.engine import WorkflowDefinition
definition = WorkflowDefinition.from_string(sample_workflow_yaml)
assert definition.id == "test-workflow"
assert len(definition.inputs) == 2
def test_from_string_invalid(self):
from specify_cli.workflows.engine import WorkflowDefinition
with pytest.raises(ValueError, match="must be a mapping"):
WorkflowDefinition.from_string("- just a list")
def test_inputs_parsed(self, sample_workflow_yaml):
from specify_cli.workflows.engine import WorkflowDefinition
definition = WorkflowDefinition.from_string(sample_workflow_yaml)
assert "spec" in definition.inputs
assert definition.inputs["spec"]["required"] is True
assert definition.inputs["scope"]["default"] == "full"
# ===== Workflow Validation Tests =====
class TestWorkflowValidation:
"""Test workflow validation."""
def test_valid_workflow(self, sample_workflow_yaml):
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string(sample_workflow_yaml)
errors = validate_workflow(definition)
assert errors == []
def test_missing_id(self):
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
workflow:
name: "Test"
version: "1.0.0"
steps:
- id: step-one
command: speckit.specify
""")
errors = validate_workflow(definition)
assert any("workflow.id" in e for e in errors)
def test_invalid_id_format(self):
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
workflow:
id: "Invalid ID!"
name: "Test"
version: "1.0.0"
steps:
- id: step-one
command: speckit.specify
""")
errors = validate_workflow(definition)
assert any("lowercase alphanumeric" in e for e in errors)
def test_no_steps(self):
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
workflow:
id: "test"
name: "Test"
version: "1.0.0"
steps: []
""")
errors = validate_workflow(definition)
assert any("no steps" in e.lower() for e in errors)
def test_duplicate_step_ids(self):
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
workflow:
id: "test"
name: "Test"
version: "1.0.0"
steps:
- id: same-id
command: speckit.specify
- id: same-id
command: speckit.plan
""")
errors = validate_workflow(definition)
assert any("Duplicate" in e for e in errors)
def test_invalid_step_type(self):
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
workflow:
id: "test"
name: "Test"
version: "1.0.0"
steps:
- id: bad
type: nonexistent
""")
errors = validate_workflow(definition)
assert any("invalid type" in e.lower() for e in errors)
def test_nested_step_validation(self):
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
workflow:
id: "test"
name: "Test"
version: "1.0.0"
steps:
- id: branch
type: if
condition: "{{ true }}"
then:
- id: nested-a
command: speckit.specify
else:
- id: nested-b
command: speckit.plan
""")
errors = validate_workflow(definition)
assert errors == []
def test_invalid_input_type(self):
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
workflow:
id: "test"
name: "Test"
version: "1.0.0"
inputs:
bad:
type: array
steps:
- id: step-one
command: speckit.specify
""")
errors = validate_workflow(definition)
assert any("invalid type" in e.lower() for e in errors)
# ===== Workflow Engine Tests =====
class TestWorkflowEngine:
"""Test WorkflowEngine execution."""
def test_load_from_file(self, sample_workflow_file, project_dir):
from specify_cli.workflows.engine import WorkflowEngine
engine = WorkflowEngine(project_dir)
definition = engine.load_workflow(str(sample_workflow_file))
assert definition.id == "test-workflow"
def test_load_from_installed_id(self, sample_workflow_file, project_dir):
from specify_cli.workflows.engine import WorkflowEngine
engine = WorkflowEngine(project_dir)
definition = engine.load_workflow("test-workflow")
assert definition.id == "test-workflow"
def test_load_not_found(self, project_dir):
from specify_cli.workflows.engine import WorkflowEngine
engine = WorkflowEngine(project_dir)
with pytest.raises(FileNotFoundError):
engine.load_workflow("nonexistent")
def test_execute_simple_workflow(self, project_dir):
from unittest.mock import patch
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
yaml_str = """
schema_version: "1.0"
workflow:
id: "simple"
name: "Simple"
version: "1.0.0"
integration: claude
inputs:
name:
type: string
default: "test"
steps:
- id: step-one
command: speckit.specify
input:
args: "{{ inputs.name }}"
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
state = engine.execute(definition, {"name": "login"})
assert state.status == RunStatus.FAILED
assert "step-one" in state.step_results
assert state.step_results["step-one"]["output"]["command"] == "speckit.specify"
assert state.step_results["step-one"]["output"]["input"]["args"] == "login"
def test_execute_with_gate_pauses(self, project_dir):
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
yaml_str = """
schema_version: "1.0"
workflow:
id: "gated"
name: "Gated"
version: "1.0.0"
steps:
- id: step-one
type: shell
run: "echo test"
- id: gate
type: gate
message: "Review?"
options: [approve, reject]
on_reject: abort
- id: step-two
type: shell
run: "echo done"
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.PAUSED
assert "gate" in state.step_results
assert state.step_results["gate"]["status"] == "paused"
def test_execute_with_shell_step(self, project_dir):
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
yaml_str = """
schema_version: "1.0"
workflow:
id: "shell-test"
name: "Shell Test"
version: "1.0.0"
steps:
- id: echo
type: shell
run: "echo workflow-output"
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
assert "workflow-output" in state.step_results["echo"]["output"]["stdout"]
def test_execute_with_if_then(self, project_dir):
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
yaml_str = """
schema_version: "1.0"
workflow:
id: "branching"
name: "Branching"
version: "1.0.0"
inputs:
scope:
type: string
default: "full"
steps:
- id: check
type: if
condition: "{{ inputs.scope == 'full' }}"
then:
- id: full-tasks
type: shell
run: "echo full"
else:
- id: partial-tasks
type: shell
run: "echo partial"
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
state = engine.execute(definition, {"scope": "full"})
assert state.status == RunStatus.COMPLETED
assert "full-tasks" in state.step_results
assert "partial-tasks" not in state.step_results
def test_execute_missing_required_input(self, project_dir):
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
yaml_str = """
schema_version: "1.0"
workflow:
id: "needs-input"
name: "Needs Input"
version: "1.0.0"
inputs:
name:
type: string
required: true
steps:
- id: step-one
command: speckit.specify
input:
args: "{{ inputs.name }}"
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
with pytest.raises(ValueError, match="Required input"):
engine.execute(definition, {})
def test_integration_auto_default_uses_project_integration(self, project_dir):
"""`integration: auto` should resolve to .specify/integration.json's integration."""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
specify_dir = project_dir / ".specify"
specify_dir.mkdir(parents=True, exist_ok=True)
(specify_dir / "integration.json").write_text(
json.dumps({"integration": "opencode", "version": "0.7.4"}),
encoding="utf-8",
)
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "auto-default"
name: "Auto Default"
version: "1.0.0"
inputs:
integration:
type: string
default: "auto"
""")
engine = WorkflowEngine(project_dir)
resolved = engine._resolve_inputs(definition, {})
assert resolved["integration"] == "opencode"
def test_integration_auto_default_falls_back_when_no_integration_json(self, project_dir):
"""`integration: auto` should keep the literal "auto" when project state is missing.
The engine itself must not invent an integration when
``.specify/integration.json`` is absent; any later validation or
command resolution will handle an unresolved ``"auto"`` value.
"""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "auto-fallback"
name: "Auto Fallback"
version: "1.0.0"
inputs:
integration:
type: string
default: "auto"
""")
engine = WorkflowEngine(project_dir)
resolved = engine._resolve_inputs(definition, {})
assert resolved["integration"] == "auto"
def test_integration_explicit_input_overrides_auto(self, project_dir):
"""An explicit --input integration=X must win over `auto` even when integration.json exists."""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
specify_dir = project_dir / ".specify"
specify_dir.mkdir(parents=True, exist_ok=True)
(specify_dir / "integration.json").write_text(
json.dumps({"integration": "opencode"}),
encoding="utf-8",
)
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "explicit-wins"
name: "Explicit Wins"
version: "1.0.0"
inputs:
integration:
type: string
default: "auto"
""")
engine = WorkflowEngine(project_dir)
resolved = engine._resolve_inputs(definition, {"integration": "claude"})
assert resolved["integration"] == "claude"
def test_integration_explicit_auto_resolves_like_default(self, project_dir):
"""Passing ``integration=auto`` explicitly must resolve the sentinel,
not pass it through as a literal — the workflow prompt advertises
``auto`` as a valid value, so the dispatch path must never see it.
"""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
specify_dir = project_dir / ".specify"
specify_dir.mkdir(parents=True, exist_ok=True)
(specify_dir / "integration.json").write_text(
json.dumps({"integration": "opencode"}),
encoding="utf-8",
)
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "explicit-auto"
name: "Explicit Auto"
version: "1.0.0"
inputs:
integration:
type: string
default: "auto"
""")
engine = WorkflowEngine(project_dir)
resolved = engine._resolve_inputs(definition, {"integration": "auto"})
assert resolved["integration"] == "opencode"
def test_integration_auto_ignores_malformed_integration_json(self, project_dir):
"""A malformed integration.json must not crash — fall back to the literal default."""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
specify_dir = project_dir / ".specify"
specify_dir.mkdir(parents=True, exist_ok=True)
(specify_dir / "integration.json").write_text("{not json", encoding="utf-8")
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "auto-malformed"
name: "Auto Malformed"
version: "1.0.0"
inputs:
integration:
type: string
default: "auto"
""")
engine = WorkflowEngine(project_dir)
resolved = engine._resolve_inputs(definition, {})
assert resolved["integration"] == "auto"
def test_integration_auto_ignores_non_utf8_integration_json(self, project_dir):
"""A non-UTF8 integration.json must not crash — fall back to the literal default."""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
specify_dir = project_dir / ".specify"
specify_dir.mkdir(parents=True, exist_ok=True)
# 0xFF is invalid as the leading byte of a UTF-8 sequence, so
# ``Path.read_text(encoding="utf-8")`` raises UnicodeDecodeError.
(specify_dir / "integration.json").write_bytes(b"\xff\xfe\x00\x00")
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "auto-non-utf8"
name: "Auto Non UTF-8"
version: "1.0.0"
inputs:
integration:
type: string
default: "auto"
""")
engine = WorkflowEngine(project_dir)
resolved = engine._resolve_inputs(definition, {})
assert resolved["integration"] == "auto"
def test_integration_auto_resolves_modern_normalized_state(self, project_dir):
"""`integration: auto` must resolve modern state files that record
``default_integration`` / ``installed_integrations`` and omit the
legacy ``integration`` field."""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
specify_dir = project_dir / ".specify"
specify_dir.mkdir(parents=True, exist_ok=True)
(specify_dir / "integration.json").write_text(
json.dumps(
{
"version": "0.8.3",
"integration_state_schema": 1,
"default_integration": "claude",
"installed_integrations": ["claude", "copilot"],
"integration_settings": {},
}
),
encoding="utf-8",
)
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "auto-modern"
name: "Auto Modern"
version: "1.0.0"
inputs:
integration:
type: string
default: "auto"
""")
engine = WorkflowEngine(project_dir)
resolved = engine._resolve_inputs(definition, {})
assert resolved["integration"] == "claude"
def test_integration_auto_rejects_future_state_schema(self, project_dir):
"""`integration: auto` must not silently use a state file written by a newer
CLI (``integration_state_schema`` greater than the current supported value);
the resolver falls back to the literal default rather than guessing."""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.integration_state import INTEGRATION_STATE_SCHEMA
specify_dir = project_dir / ".specify"
specify_dir.mkdir(parents=True, exist_ok=True)
(specify_dir / "integration.json").write_text(
json.dumps(
{
"version": "99.0.0",
"integration_state_schema": INTEGRATION_STATE_SCHEMA + 1,
"default_integration": "claude",
"installed_integrations": ["claude"],
"integration_settings": {},
}
),
encoding="utf-8",
)
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "auto-future-schema"
name: "Auto Future Schema"
version: "1.0.0"
inputs:
integration:
type: string
default: "auto"
""")
engine = WorkflowEngine(project_dir)
resolved = engine._resolve_inputs(definition, {})
assert resolved["integration"] == "auto"
def test_default_value_is_validated_against_enum(self, project_dir):
"""Defaults must run through the same coercion/enum check as provided inputs."""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "default-enum"
name: "Default Enum"
version: "1.0.0"
inputs:
scope:
type: string
default: "not-in-enum"
enum: ["full", "backend-only", "frontend-only"]
""")
engine = WorkflowEngine(project_dir)
with pytest.raises(ValueError, match="not in allowed values"):
engine._resolve_inputs(definition, {})
def test_default_value_is_coerced_to_declared_type(self, project_dir):
"""A numeric default declared as a string should still be coerced like a provided input."""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "default-coerce"
name: "Default Coerce"
version: "1.0.0"
inputs:
retries:
type: number
default: "3"
""")
engine = WorkflowEngine(project_dir)
resolved = engine._resolve_inputs(definition, {})
assert resolved["retries"] == 3
assert isinstance(resolved["retries"], int)
def test_validate_workflow_rejects_invalid_default(self):
"""Authoring-time validation should reject defaults that violate enum."""
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "bad-default"
name: "Bad Default"
version: "1.0.0"
inputs:
scope:
type: string
default: "not-in-enum"
enum: ["full", "backend-only", "frontend-only"]
steps:
- id: noop
type: gate
message: "noop"
options: [approve]
""")
errors = validate_workflow(definition)
assert any("invalid default" in e for e in errors), errors
def test_validate_workflow_exempts_integration_auto_sentinel(self):
"""``integration: auto`` is a runtime-resolved sentinel and must not fail validation."""
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "auto-ok"
name: "Auto OK"
version: "1.0.0"
inputs:
integration:
type: string
default: "auto"
enum: ["copilot", "claude", "gemini"]
steps:
- id: noop
type: gate
message: "noop"
options: [approve]
""")
errors = validate_workflow(definition)
assert not any("invalid default" in e for e in errors), errors
def test_validate_workflow_still_checks_type_for_auto_sentinel(self):
"""The ``auto`` exemption only skips enum-membership; declared type is still enforced."""
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "auto-bad-type"
name: "Auto Bad Type"
version: "1.0.0"
inputs:
integration:
type: number
default: "auto"
steps:
- id: noop
type: gate
message: "noop"
options: [approve]
""")
errors = validate_workflow(definition)
assert any("invalid default" in e for e in errors), errors
def test_validate_workflow_rejects_bool_default_for_number_type(self):
"""``type: number`` paired with a bool default must fail — bool is a
subclass of int so ``float(True)`` would otherwise silently coerce
``true`` to ``1``.
"""
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "bool-as-number"
name: "Bool As Number"
version: "1.0.0"
inputs:
count:
type: number
default: true
steps:
- id: noop
type: gate
message: "noop"
options: [approve]
""")
errors = validate_workflow(definition)
assert any("invalid default" in e for e in errors), errors
def test_validate_workflow_rejects_non_string_default_for_string_type(self):
"""``type: string`` must require an actual string — a numeric YAML
default like ``5`` would otherwise slip through unvalidated.
"""
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "number-as-string"
name: "Number As String"
version: "1.0.0"
inputs:
label:
type: string
default: 5
steps:
- id: noop
type: gate
message: "noop"
options: [approve]
""")
errors = validate_workflow(definition)
assert any("invalid default" in e for e in errors), errors
def test_while_loop_condition_reads_latest_iteration(self, project_dir):
"""Regression: while-loop condition must see updated step output
from the most recent iteration, not stale iteration-0 data.
See https://github.com/github/spec-kit/issues/2592
"""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
# Shell step echoes a counter via a file.
# Condition: exit_code != 0 means "keep looping" — but a non-zero
# exit code would mark the step FAILED and abort the run, so we
# use stdout-based comparison instead.
#
# Iteration 0: counter=1, echoes "1" → not "done" → loop continues
# Iteration 1: counter=2, echoes "done" → condition false → stop
# Without the fix, condition always reads iteration-0 stdout,
# so the loop runs all max_iterations.
import sys
counter_file = project_dir / ".counter"
counter_file.write_text("0", encoding="utf-8")
py = sys.executable
script_file = project_dir / "_tick.py"
script_file.write_text(
f"import pathlib; p = pathlib.Path(r'{counter_file}')\n"
"n = int(p.read_text()) + 1; p.write_text(str(n))\n"
"print('done' if n >= 2 else str(n), end='')\n",
encoding="utf-8",
)
yaml_str = f"""
schema_version: "1.0"
workflow:
id: "while-condition-update"
name: "While Condition Update"
version: "1.0.0"
steps:
- id: retry-loop
type: while
condition: "{{{{ 'done' not in steps.attempt.output.stdout }}}}"
max_iterations: 5
steps:
- id: attempt
type: shell
run: '"{py}" "{script_file}"'
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
# The unprefixed key should reflect the latest iteration's result.
assert state.step_results["attempt"]["output"]["stdout"] == "done"
# Namespaced iteration-1 result should also exist.
assert "retry-loop:attempt:1" in state.step_results
# Counter should be 2 (iteration 0 + iteration 1), not 5.
assert counter_file.read_text(encoding="utf-8").strip() == "2"
def test_do_while_loop_condition_reads_latest_iteration(self, project_dir):
"""Regression: do-while loop condition must also see updated output.
See https://github.com/github/spec-kit/issues/2592
"""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
import sys
counter_file = project_dir / ".counter"
counter_file.write_text("0", encoding="utf-8")
py = sys.executable
script_file = project_dir / "_tick.py"
script_file.write_text(
f"import pathlib; p = pathlib.Path(r'{counter_file}')\n"
"n = int(p.read_text()) + 1; p.write_text(str(n))\n"
"print('done' if n >= 2 else str(n), end='')\n",
encoding="utf-8",
)
yaml_str = f"""
schema_version: "1.0"
workflow:
id: "do-while-condition-update"
name: "Do While Condition Update"
version: "1.0.0"
steps:
- id: retry-loop
type: do-while
condition: "{{{{ 'done' not in steps.attempt.output.stdout }}}}"
max_iterations: 5
steps:
- id: attempt
type: shell
run: '"{py}" "{script_file}"'
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
assert state.step_results["attempt"]["output"]["stdout"] == "done"
assert counter_file.read_text(encoding="utf-8").strip() == "2"
def test_while_loop_runs_to_max_when_condition_stays_true(self, project_dir):
"""While loop must still run to max_iterations when the condition
never becomes false — copy-back must not break this path.
See https://github.com/github/spec-kit/issues/2592
"""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
import sys
counter_file = project_dir / ".counter"
counter_file.write_text("0", encoding="utf-8")
py = sys.executable
script_file = project_dir / "_tick.py"
script_file.write_text(
f"import pathlib; p = pathlib.Path(r'{counter_file}')\n"
"n = int(p.read_text()) + 1; p.write_text(str(n))\n"
"print('pending', end='')\n",
encoding="utf-8",
)
yaml_str = f"""
schema_version: "1.0"
workflow:
id: "while-max-iterations"
name: "While Max Iterations"
version: "1.0.0"
steps:
- id: retry-loop
type: while
condition: "{{{{ 'done' not in steps.tick.output.stdout }}}}"
max_iterations: 3
steps:
- id: tick
type: shell
run: '"{py}" "{script_file}"'
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
# All 3 iterations ran (iteration 0 + 2 loop iterations).
assert counter_file.read_text(encoding="utf-8").strip() == "3"
# Unprefixed key holds the last iteration's result.
assert state.step_results["tick"]["output"]["stdout"] == "pending"
# Namespaced keys for loop iterations exist.
assert "retry-loop:tick:1" in state.step_results
assert "retry-loop:tick:2" in state.step_results
def test_do_while_loop_runs_to_max_when_condition_stays_true(self, project_dir):
"""Do-while loop must still run to max_iterations when the condition
never becomes false.
See https://github.com/github/spec-kit/issues/2592
"""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
import sys
counter_file = project_dir / ".counter"
counter_file.write_text("0", encoding="utf-8")
py = sys.executable
script_file = project_dir / "_tick.py"
script_file.write_text(
f"import pathlib; p = pathlib.Path(r'{counter_file}')\n"
"n = int(p.read_text()) + 1; p.write_text(str(n))\n"
"print('pending', end='')\n",
encoding="utf-8",
)
yaml_str = f"""
schema_version: "1.0"
workflow:
id: "do-while-max-iterations"
name: "Do While Max Iterations"
version: "1.0.0"
steps:
- id: retry-loop
type: do-while
condition: "{{{{ 'done' not in steps.tick.output.stdout }}}}"
max_iterations: 3
steps:
- id: tick
type: shell
run: '"{py}" "{script_file}"'
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
assert counter_file.read_text(encoding="utf-8").strip() == "3"
assert state.step_results["tick"]["output"]["stdout"] == "pending"
def test_while_loop_multi_step_body_inter_step_refs(self, project_dir):
"""Multi-step loop body: step B must see step A's output from the
current iteration, not a stale previous one.
See https://github.com/github/spec-kit/issues/2592
"""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
import sys
counter_file = project_dir / ".counter"
counter_file.write_text("0", encoding="utf-8")
py = sys.executable
# Step A: increments counter file, echoes the value.
step_a_file = project_dir / "_step_a.py"
step_a_file.write_text(
f"import pathlib; p = pathlib.Path(r'{counter_file}')\n"
"n = int(p.read_text()) + 1; p.write_text(str(n))\n"
"print(str(n), end='')\n",
encoding="utf-8",
)
# Step B uses {{ steps.step-a.output.stdout }} expression
# substitution in its run command so the engine resolves the
# aliased unprefixed key — this is the real inter-step test.
yaml_str = f"""
schema_version: "1.0"
workflow:
id: "while-multi-step"
name: "While Multi Step"
version: "1.0.0"
steps:
- id: retry-loop
type: while
condition: "{{{{ 'done' not in steps.step-a.output.stdout }}}}"
max_iterations: 3
steps:
- id: step-a
type: shell
run: '"{py}" "{step_a_file}"'
- id: step-b
type: shell
run: "echo b-saw-{{{{ steps.step-a.output.stdout }}}}"
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
# Both unprefixed keys reflect the latest iteration's results.
assert state.step_results["step-a"]["output"]["stdout"] == "3"
# Step B saw step A's output via expression substitution.
assert "b-saw-3" in state.step_results["step-b"]["output"]["stdout"]
# Namespaced keys exist for loop iterations.
assert "retry-loop:step-a:1" in state.step_results
assert "retry-loop:step-b:1" in state.step_results
assert "retry-loop:step-a:2" in state.step_results
assert "retry-loop:step-b:2" in state.step_results
# ===== context.run_id Tests =====
#
# End-to-end coverage for the `{{ context.run_id }}` template
# variable introduced in issue #2590. Locks resolution inside the
# three step types the acceptance criteria called out — shell `run:`,
# command `input.args:`, and switch `expression:` — plus the
# "workflow doesn't reference it" backward-compat path.
class TestContextRunId:
"""End-to-end tests for `{{ context.run_id }}` in workflow YAML."""
def test_shell_run_resolves_run_id(self, project_dir):
"""`run: "echo {{ context.run_id }}"` substitutes the
engine-assigned run id into the spawned shell, and the
same value appears on `state.run_id`.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "stamp-run-id"
name: "Stamp Run Id"
version: "1.0.0"
steps:
- id: stamp
type: shell
run: "echo RUN_ID={{ context.run_id }}"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition, run_id="abc12345")
assert state.run_id == "abc12345"
stdout = state.step_results["stamp"]["output"]["stdout"]
assert stdout.strip() == "RUN_ID=abc12345"
def test_command_input_args_resolves_run_id(self, project_dir):
"""`input.args: "{{ context.run_id }}"` is resolved by
`CommandStep` and recorded in step output, even when CLI
dispatch is unavailable (no integration installed). Covers
the artifact-metadata use case from the issue.
"""
from unittest.mock import patch
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "command-stamp"
name: "Command Stamp"
version: "1.0.0"
integration: claude
steps:
- id: tag-artifact
command: speckit.specify
input:
args: "{{ context.run_id }}"
""")
engine = WorkflowEngine(project_dir)
with patch(
"specify_cli.workflows.steps.command.shutil.which",
return_value=None,
):
state = engine.execute(definition, run_id="cafef00d")
# Even when dispatch fails (no CLI), the resolved input is
# recorded so downstream observers see the run id in artifact
# metadata.
assert state.step_results["tag-artifact"]["output"]["input"]["args"] == "cafef00d"
def test_switch_expression_matches_on_run_id(self, project_dir):
"""`switch` over `{{ context.run_id }}` matches against case
keys, and the nested branch can ALSO reference
`{{ context.run_id }}`. Demonstrates the run id is a
first-class value in the expression engine (not just a
string-interpolation token) AND that it propagates into
nested step execution via the recursive `_execute_steps`
traversal.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "switch-on-run-id"
name: "Switch On Run Id"
version: "1.0.0"
steps:
- id: route
type: switch
expression: "{{ context.run_id }}"
cases:
target-run:
- id: matched-branch
type: shell
run: "echo nested-run-id={{ context.run_id }}"
default:
- id: default-branch
type: shell
run: "echo defaulted"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition, run_id="target-run")
assert state.status == RunStatus.COMPLETED
assert state.step_results["route"]["output"]["matched_case"] == "target-run"
assert "matched-branch" in state.step_results
assert "default-branch" not in state.step_results
# The nested branch sees the same run id — propagation through
# recursive `_execute_steps` is intact.
nested_stdout = state.step_results["matched-branch"]["output"]["stdout"]
assert nested_stdout.strip() == "nested-run-id=target-run"
def test_workflow_without_context_reference_unchanged(self, project_dir):
"""Workflows that do not reference `{{ context.run_id }}`
continue to run exactly as before. Locks the byte-equivalent
default required by the issue's acceptance criteria.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "no-context-ref"
name: "No Context Ref"
version: "1.0.0"
steps:
- id: only-step
type: shell
run: "echo hello"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
assert state.step_results["only-step"]["output"]["stdout"].strip() == "hello"
def test_run_id_uses_speckit_workflow_run_id_env_override(self, project_dir, monkeypatch):
"""When no run_id argument is provided, SPECKIT_WORKFLOW_RUN_ID overrides the auto-generated run ID."""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
monkeypatch.setenv("SPECKIT_WORKFLOW_RUN_ID", "env-run-123")
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "env-run-id"
name: "Env Run Id"
version: "1.0.0"
steps:
- id: stamp
type: shell
run: "echo {{ context.run_id }}"
""")
state = WorkflowEngine(project_dir).execute(definition)
assert state.run_id == "env-run-123"
assert state.step_results["stamp"]["output"]["stdout"].strip() == "env-run-123"
def test_run_id_arg_takes_precedence_over_env_override(self, project_dir, monkeypatch):
"""Explicit run_id keeps existing precedence over SPECKIT_WORKFLOW_RUN_ID."""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
monkeypatch.setenv("SPECKIT_WORKFLOW_RUN_ID", "env-run-123")
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "explicit-run-id"
name: "Explicit Run Id"
version: "1.0.0"
steps:
- id: stamp
type: shell
run: "echo {{ context.run_id }}"
""")
state = WorkflowEngine(project_dir).execute(definition, run_id="explicit-456")
assert state.run_id == "explicit-456"
assert state.step_results["stamp"]["output"]["stdout"].strip() == "explicit-456"
# ===== continue_on_error Tests =====
#
# Locks the contract documented in workflows/README.md "Error Handling"
# section: when a step returns `StepResult(status=StepStatus.FAILED, ...)` and
# `continue_on_error: true` is declared, the engine records the step's
# `output` (with `exit_code` and `stderr` from the failure) and its
# `status` (sibling key on `steps.<id>`, not nested under `output`)
# and continues to the next sibling step instead of halting the run.
# Gate aborts (`output.aborted`) still halt regardless of the flag.
# Unhandled exceptions raised out of `step_impl.execute()` are out of
# scope for this flag — they propagate to `WorkflowEngine.execute()`
# and abort the run.
class TestContinueOnError:
"""Test the `continue_on_error` step-level field."""
def test_undeclared_failure_halts_run(self, project_dir):
"""Default behaviour (no `continue_on_error`): a failing step
halts the workflow run with `status == StepStatus.FAILED`.
Locks the byte-equivalent default — workflows that do not
declare the flag must behave exactly as before this feature.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "halt-on-fail"
name: "Halt On Fail"
version: "1.0.0"
steps:
- id: fail-step
type: shell
run: "exit 7"
- id: after
type: shell
run: "echo should-not-run"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.FAILED
assert "fail-step" in state.step_results
assert state.step_results["fail-step"]["output"]["exit_code"] == 7
# Subsequent step never executes when the flag is absent.
assert "after" not in state.step_results
def test_declared_and_fired_continues_run(self, project_dir):
"""`continue_on_error: true` + failing step: the run keeps
going, the failed step's result is recorded, and the
downstream step runs.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "continue-past-fail"
name: "Continue Past Fail"
version: "1.0.0"
steps:
- id: flaky-step
type: shell
run: "exit 42"
continue_on_error: true
- id: after
type: shell
run: "echo did-run"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
# Failed step's exit_code is preserved so downstream branching
# can inspect it.
assert state.step_results["flaky-step"]["output"]["exit_code"] == 42
assert state.step_results["flaky-step"]["status"] == "failed"
# Downstream step ran successfully.
assert state.step_results["after"]["output"]["exit_code"] == 0
def test_declared_but_step_succeeded_is_noop(self, project_dir):
"""`continue_on_error: true` on a step that succeeds is a
no-op — the flag only changes behaviour on StepStatus.FAILED status.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "flag-but-success"
name: "Flag But Success"
version: "1.0.0"
steps:
- id: ok-step
type: shell
run: "echo ok"
continue_on_error: true
- id: after
type: shell
run: "echo done"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
assert state.step_results["ok-step"]["status"] == "completed"
assert state.step_results["ok-step"]["output"]["exit_code"] == 0
assert state.step_results["after"]["output"]["exit_code"] == 0
def test_if_branch_routes_around_failure(self, project_dir):
"""End-to-end: `continue_on_error` + `if` cleanly routes around
a failure. The recovery branch runs; the success branch does
not.
Mirrors the canonical usage pattern from the original feature
discussion in issue #2591.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "route-around"
name: "Route Around Failure"
version: "1.0.0"
steps:
- id: heavy-thing
type: shell
run: "exit 1"
continue_on_error: true
- id: check-result
type: if
condition: "{{ steps.heavy-thing.output.exit_code != 0 }}"
then:
- id: recovery
type: shell
run: "echo recovery-ran"
else:
- id: happy-path
type: shell
run: "echo happy-path-ran"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
assert "recovery" in state.step_results
assert "happy-path" not in state.step_results
def test_gate_abort_still_halts_with_continue_on_error(
self, project_dir, monkeypatch
):
"""`continue_on_error` does NOT override a deliberate gate
abort. `output.aborted` always halts the run with
`status == ABORTED`.
Aborts are explicit operator decisions; continue_on_error
is for transient/expected step failures only.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
from specify_cli.workflows.steps.gate import GateStep
from specify_cli.workflows.steps import gate as gate_module
# Force the gate step into interactive mode and feed a "reject"
# choice so the abort path actually runs in the test env
# (default behaviour returns StepStatus.PAUSED when stdin is not a TTY).
# Swap sys.stdin itself for a stub: setattr on the real
# TextIOWrapper's `isatty` method is not assignable under some
# runners (e.g. pytest with capture disabled).
class _TTYStdin:
def isatty(self) -> bool:
return True
monkeypatch.setattr(gate_module.sys, "stdin", _TTYStdin())
monkeypatch.setattr(
GateStep, "_prompt", staticmethod(lambda _msg, _opts: "reject")
)
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "gate-abort-halts"
name: "Gate Abort Halts"
version: "1.0.0"
steps:
- id: gate-step
type: gate
message: "Approve?"
options: [approve, reject]
on_reject: abort
continue_on_error: true
- id: should-not-run
type: shell
run: "echo nope"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.ABORTED
assert "should-not-run" not in state.step_results
def test_validation_rejects_non_bool_continue_on_error(self):
"""`continue_on_error` must be a literal boolean; coerced
strings like `"true"` are rejected at validation time so
authoring mistakes surface before execution.
"""
from specify_cli.workflows.engine import (
WorkflowDefinition,
validate_workflow,
)
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "bad-coe"
name: "Bad COE"
version: "1.0.0"
steps:
- id: step-one
type: shell
run: "true"
continue_on_error: "true"
""")
errors = validate_workflow(definition)
assert any(
"continue_on_error" in e and "boolean" in e for e in errors
), errors
def test_validation_accepts_bool_continue_on_error(self):
"""Boolean values pass validation cleanly."""
from specify_cli.workflows.engine import (
WorkflowDefinition,
validate_workflow,
)
for value in (True, False):
yaml_value = "true" if value else "false"
definition = WorkflowDefinition.from_string(f"""
schema_version: "1.0"
workflow:
id: "good-coe"
name: "Good COE"
version: "1.0.0"
steps:
- id: step-one
type: shell
run: "true"
continue_on_error: {yaml_value}
""")
errors = validate_workflow(definition)
assert errors == [], errors
def test_engine_ignores_truthy_non_bool_continue_on_error(self, project_dir):
"""Defense-in-depth: even if a caller bypasses
`validate_workflow()` and feeds the engine a definition with
`continue_on_error: "true"` (a string), the engine must NOT
honour the flag — only a literal boolean enables the
behaviour. `WorkflowEngine.execute()` does not auto-validate
(the `WorkflowEngine.load_workflow` docstring explicitly
notes the definition is "not yet validated; call
`validate_workflow()` or `engine.validate()` separately"),
so the engine guards against truthy non-bool values itself
via an identity check rather than truthiness.
"""
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
from specify_cli.workflows.base import RunStatus
# Bypass `validate_workflow()` — execute() is what would
# be called by a caller that skipped validation.
definition = WorkflowDefinition.from_string("""
schema_version: "1.0"
workflow:
id: "string-coe"
name: "String COE"
version: "1.0.0"
steps:
- id: fail-step
type: shell
run: "exit 1"
continue_on_error: "true"
- id: should-not-run
type: shell
run: "echo should-not-run"
""")
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
# String "true" is truthy but not a literal boolean, so the
# engine must treat the step as a halting failure.
assert state.status == RunStatus.FAILED
assert "should-not-run" not in state.step_results
# ===== State Persistence Tests =====
class TestRunState:
"""Test RunState persistence and loading."""
def test_save_and_load(self, project_dir):
from specify_cli.workflows.engine import RunState
from specify_cli.workflows.base import RunStatus
state = RunState(
run_id="test-run",
workflow_id="test-workflow",
project_root=project_dir,
)
state.status = RunStatus.RUNNING
state.inputs = {"name": "login"}
state.step_results = {
"step-one": {
"output": {"file": "spec.md"},
"status": "completed",
}
}
state.save()
loaded = RunState.load("test-run", project_dir)
assert loaded.run_id == "test-run"
assert loaded.workflow_id == "test-workflow"
assert loaded.status == RunStatus.RUNNING
assert loaded.inputs == {"name": "login"}
assert "step-one" in loaded.step_results
def test_load_not_found(self, project_dir):
from specify_cli.workflows.engine import RunState
with pytest.raises(FileNotFoundError):
RunState.load("nonexistent", project_dir)
@pytest.mark.parametrize(
"malicious_run_id",
[
# Parent-directory traversal — the classic path-escape vector.
"../escape",
"..",
"../../etc/passwd",
# Embedded path separators — both POSIX and Windows.
"foo/bar",
"foo\\bar",
# Leading non-alphanumeric characters that the existing
# pattern's anchor blocks (would be mistaken for CLI flags
# or hidden files in shell completions / error messages).
".hidden",
"-flag",
# NUL byte — some filesystems treat the prefix as a valid
# path and silently truncate at the NUL.
"foo\x00bar",
# Empty string — degenerate case, matches no file but the
# validator should reject it before any I/O.
"",
],
)
def test_load_rejects_path_traversal(self, project_dir, malicious_run_id):
"""``RunState.load`` validates ``run_id`` before touching the
filesystem.
Without this guard, a value like ``../escape`` passed via
``specify workflow resume`` would interpolate path-traversal
segments into the lookup path. ``state_path.exists()`` would
probe arbitrary paths the process can read (a file-existence
oracle) and ``json.load`` would happily parse attacker-planted
JSON from outside ``.specify/workflows/runs/``. The check must
fire *before* the path is built — ``__init__``'s identical
regex on ``state_data["run_id"]`` fires too late.
"""
from specify_cli.workflows.engine import RunState
# Plant a state.json *outside* the legitimate ``runs/`` directory
# at the location ``../escape`` would traverse to, so a missing
# guard would surface as a successful load rather than a
# ``FileNotFoundError`` (which would be ambiguous with the
# not-found case).
runs_dir = project_dir / ".specify" / "workflows" / "runs"
runs_dir.mkdir(parents=True, exist_ok=True)
attacker_dir = project_dir / ".specify" / "workflows" / "escape"
attacker_dir.mkdir(exist_ok=True)
(attacker_dir / "state.json").write_text(
json.dumps(
{
"run_id": "pwned",
"workflow_id": "attacker-owned",
"status": "created",
}
),
encoding="utf-8",
)
with pytest.raises(ValueError, match="Invalid run_id"):
RunState.load(malicious_run_id, project_dir)
@pytest.mark.parametrize(
"bad_run_id",
[
# One vector per category from ``test_load_rejects_path_traversal``
# — enough to prove both entry points agree without re-running
# the full attack matrix here.
"../escape", # parent-directory traversal
"foo/bar", # embedded path separator
".hidden", # leading non-alphanumeric
"", # empty / degenerate
],
)
def test_init_and_load_share_validation(self, project_dir, bad_run_id):
"""``__init__`` *and* ``load`` reject the same malformed IDs.
The two entry points must stay in sync — drift would let an ID
slip in via one path that the other would reject, producing
confusing crashes mid-workflow. The previous version of this
test only exercised ``__init__`` and ``_validate_run_id`` (the
shared helper), so a regression in ``load`` — e.g. someone
deleting the ``cls._validate_run_id(run_id)`` call there — could
slip through despite ``__init__`` and the helper staying
aligned. We now hit ``load`` directly with the same vector so
any drift between the two call sites is caught by this test.
"""
from specify_cli.workflows.engine import RunState
# ``__init__`` rejects up front.
with pytest.raises(ValueError, match="Invalid run_id"):
RunState(run_id=bad_run_id)
# The shared helper rejects the value too (sanity check that the
# ``__init__`` rejection came from the validator, not some
# unrelated constructor failure).
with pytest.raises(ValueError, match="Invalid run_id"):
RunState._validate_run_id(bad_run_id)
# And ``load`` rejects it *before* touching the filesystem. This
# is the assertion the previous version was missing: without it,
# a regression in ``load`` (e.g. forgetting to call the
# validator before building the path) would not be caught even
# though ``__init__`` and the helper still agreed.
with pytest.raises(ValueError, match="Invalid run_id"):
RunState.load(bad_run_id, project_dir)
def test_append_log(self, project_dir):
from specify_cli.workflows.engine import RunState
state = RunState(
run_id="log-test",
workflow_id="test",
project_root=project_dir,
)
state.append_log({"event": "test_event", "data": "hello"})
log_file = state.runs_dir / "log.jsonl"
assert log_file.exists()
lines = log_file.read_text().strip().split("\n")
entry = json.loads(lines[0])
assert entry["event"] == "test_event"
assert "timestamp" in entry
class TestListRuns:
"""Test listing workflow runs."""
def test_list_empty(self, project_dir):
from specify_cli.workflows.engine import WorkflowEngine
engine = WorkflowEngine(project_dir)
assert engine.list_runs() == []
def test_list_after_execution(self, project_dir):
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
yaml_str = """
schema_version: "1.0"
workflow:
id: "list-test"
name: "List Test"
version: "1.0.0"
steps:
- id: step-one
type: shell
run: "echo test"
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
engine.execute(definition)
runs = engine.list_runs()
assert len(runs) == 1
assert runs[0]["workflow_id"] == "list-test"
# ===== Workflow Registry Tests =====
class TestWorkflowRegistry:
"""Test WorkflowRegistry operations."""
def test_add_and_get(self, project_dir):
from specify_cli.workflows.catalog import WorkflowRegistry
registry = WorkflowRegistry(project_dir)
registry.add("test-wf", {"name": "Test", "version": "1.0.0"})
entry = registry.get("test-wf")
assert entry is not None
assert entry["name"] == "Test"
assert "installed_at" in entry
def test_remove(self, project_dir):
from specify_cli.workflows.catalog import WorkflowRegistry
registry = WorkflowRegistry(project_dir)
registry.add("test-wf", {"name": "Test"})
assert registry.is_installed("test-wf")
registry.remove("test-wf")
assert not registry.is_installed("test-wf")
def test_list(self, project_dir):
from specify_cli.workflows.catalog import WorkflowRegistry
registry = WorkflowRegistry(project_dir)
registry.add("wf-a", {"name": "A"})
registry.add("wf-b", {"name": "B"})
installed = registry.list()
assert "wf-a" in installed
assert "wf-b" in installed
def test_is_installed(self, project_dir):
from specify_cli.workflows.catalog import WorkflowRegistry
registry = WorkflowRegistry(project_dir)
assert not registry.is_installed("missing")
registry.add("exists", {"name": "Exists"})
assert registry.is_installed("exists")
def test_persistence(self, project_dir):
from specify_cli.workflows.catalog import WorkflowRegistry
registry1 = WorkflowRegistry(project_dir)
registry1.add("test-wf", {"name": "Test"})
# Load fresh
registry2 = WorkflowRegistry(project_dir)
assert registry2.is_installed("test-wf")
# ===== Workflow Catalog Tests =====
class TestWorkflowCatalog:
"""Test WorkflowCatalog catalog resolution."""
def test_default_catalogs(self, project_dir):
from specify_cli.workflows.catalog import WorkflowCatalog
catalog = WorkflowCatalog(project_dir)
entries = catalog.get_active_catalogs()
assert len(entries) == 2
assert entries[0].name == "default"
assert entries[1].name == "community"
def test_env_var_override(self, project_dir, monkeypatch):
from specify_cli.workflows.catalog import WorkflowCatalog
monkeypatch.setenv("SPECKIT_WORKFLOW_CATALOG_URL", "https://example.com/catalog.json")
catalog = WorkflowCatalog(project_dir)
entries = catalog.get_active_catalogs()
assert len(entries) == 1
assert entries[0].name == "env-override"
assert entries[0].url == "https://example.com/catalog.json"
def test_project_level_config(self, project_dir):
from specify_cli.workflows.catalog import WorkflowCatalog
config_path = project_dir / ".specify" / "workflow-catalogs.yml"
config_path.write_text(yaml.dump({
"catalogs": [{
"name": "custom",
"url": "https://example.com/wf-catalog.json",
"priority": 1,
"install_allowed": True,
}]
}))
catalog = WorkflowCatalog(project_dir)
entries = catalog.get_active_catalogs()
assert len(entries) == 1
assert entries[0].name == "custom"
def test_validate_url_http_rejected(self, project_dir):
from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError
catalog = WorkflowCatalog(project_dir)
with pytest.raises(WorkflowValidationError, match="HTTPS"):
catalog._validate_catalog_url("http://evil.com/catalog.json")
def test_validate_url_localhost_http_allowed(self, project_dir):
from specify_cli.workflows.catalog import WorkflowCatalog
catalog = WorkflowCatalog(project_dir)
# Should not raise
catalog._validate_catalog_url("http://localhost:8080/catalog.json")
def test_add_catalog(self, project_dir):
from specify_cli.workflows.catalog import WorkflowCatalog
catalog = WorkflowCatalog(project_dir)
catalog.add_catalog("https://example.com/new-catalog.json", "my-catalog")
config_path = project_dir / ".specify" / "workflow-catalogs.yml"
assert config_path.exists()
data = yaml.safe_load(config_path.read_text())
assert len(data["catalogs"]) == 1
assert data["catalogs"][0]["url"] == "https://example.com/new-catalog.json"
def test_add_catalog_duplicate_rejected(self, project_dir):
from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError
catalog = WorkflowCatalog(project_dir)
catalog.add_catalog("https://example.com/catalog.json")
with pytest.raises(WorkflowValidationError, match="already configured"):
catalog.add_catalog("https://example.com/catalog.json")
def test_remove_catalog(self, project_dir):
from specify_cli.workflows.catalog import WorkflowCatalog
catalog = WorkflowCatalog(project_dir)
catalog.add_catalog("https://example.com/c1.json", "first")
catalog.add_catalog("https://example.com/c2.json", "second")
removed = catalog.remove_catalog(0)
assert removed == "first"
config_path = project_dir / ".specify" / "workflow-catalogs.yml"
data = yaml.safe_load(config_path.read_text())
assert len(data["catalogs"]) == 1
def test_remove_catalog_invalid_index(self, project_dir):
from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError
catalog = WorkflowCatalog(project_dir)
catalog.add_catalog("https://example.com/c1.json")
with pytest.raises(WorkflowValidationError, match="out of range"):
catalog.remove_catalog(5)
def test_get_catalog_configs(self, project_dir):
from specify_cli.workflows.catalog import WorkflowCatalog
catalog = WorkflowCatalog(project_dir)
configs = catalog.get_catalog_configs()
assert len(configs) == 2
assert configs[0]["name"] == "default"
assert isinstance(configs[0]["install_allowed"], bool)
# ===== Integration Test =====
class TestWorkflowIntegration:
"""End-to-end workflow execution tests."""
def test_full_sequential_workflow(self, project_dir):
"""Execute a multi-step sequential workflow end to end."""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
yaml_str = """
schema_version: "1.0"
workflow:
id: "e2e-test"
name: "E2E Test"
version: "1.0.0"
integration: claude
inputs:
feature:
type: string
default: "login"
steps:
- id: specify
type: shell
run: "echo speckit.specify {{ inputs.feature }}"
- id: check-scope
type: if
condition: "{{ inputs.feature == 'login' }}"
then:
- id: echo-full
type: shell
run: "echo full scope"
else:
- id: echo-partial
type: shell
run: "echo partial scope"
- id: plan
type: shell
run: "echo speckit.plan"
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
assert "specify" in state.step_results
assert "check-scope" in state.step_results
assert "echo-full" in state.step_results
assert "echo-partial" not in state.step_results
assert "plan" in state.step_results
def test_switch_workflow(self, project_dir):
"""Test switch step type in a workflow."""
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
from specify_cli.workflows.base import RunStatus
yaml_str = """
schema_version: "1.0"
workflow:
id: "switch-test"
name: "Switch Test"
version: "1.0.0"
inputs:
action:
type: string
default: "plan"
steps:
- id: route
type: switch
expression: "{{ inputs.action }}"
cases:
specify:
- id: do-specify
type: shell
run: "echo specify"
plan:
- id: do-plan
type: shell
run: "echo plan"
default:
- id: do-default
type: shell
run: "echo default"
"""
definition = WorkflowDefinition.from_string(yaml_str)
engine = WorkflowEngine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.COMPLETED
assert "do-plan" in state.step_results
assert "do-specify" not in state.step_results
class TestResumeWithInputs:
"""Test that `workflow resume` can accept updated workflow inputs."""
_WF_CMD = """
schema_version: "1.0"
workflow:
id: "resume-cmd-wf"
name: "Resume Cmd WF"
version: "1.0.0"
inputs:
cmd:
type: string
default: "exit 1"
steps:
- id: s
type: shell
run: "{{ inputs.cmd }}"
"""
_WF_NUM = """
schema_version: "1.0"
workflow:
id: "resume-num-wf"
name: "Resume Num WF"
version: "1.0.0"
inputs:
count:
type: number
default: 1
steps:
- id: gate
type: gate
message: "Review"
options: [approve, reject]
"""
def _engine(self, project_dir):
from specify_cli.workflows.engine import WorkflowEngine
return WorkflowEngine(project_dir)
def test_resume_with_input_reruns_step_with_new_value(self, project_dir):
from specify_cli.workflows.engine import WorkflowDefinition
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string(self._WF_CMD)
engine = self._engine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.FAILED # "exit 1" fails
resumed = engine.resume(state.run_id, {"cmd": "exit 0"})
assert resumed.status == RunStatus.COMPLETED
assert resumed.inputs["cmd"] == "exit 0"
def test_resume_without_input_preserves_inputs(self, project_dir):
from specify_cli.workflows.engine import WorkflowDefinition
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string(self._WF_CMD)
engine = self._engine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.FAILED
resumed = engine.resume(state.run_id)
assert resumed.status == RunStatus.FAILED # still "exit 1"
assert resumed.inputs["cmd"] == "exit 1"
def test_resume_merges_and_coerces_typed_input(self, project_dir):
import json as _json
from specify_cli.workflows.engine import WorkflowDefinition
from specify_cli.workflows.base import RunStatus
definition = WorkflowDefinition.from_string(self._WF_NUM)
engine = self._engine(project_dir)
state = engine.execute(definition)
assert state.status == RunStatus.PAUSED
resumed = engine.resume(state.run_id, {"count": "5"})
assert resumed.inputs["count"] == 5 # coerced string -> number
inputs_file = (
project_dir / ".specify" / "workflows" / "runs" / state.run_id / "inputs.json"
)
assert _json.loads(inputs_file.read_text())["inputs"]["count"] == 5
def test_resume_invalid_typed_input_raises(self, project_dir):
from specify_cli.workflows.engine import WorkflowDefinition
definition = WorkflowDefinition.from_string(self._WF_NUM)
engine = self._engine(project_dir)
state = engine.execute(definition)
with pytest.raises(ValueError):
engine.resume(state.run_id, {"count": "not-a-number"})
def test_cli_resume_input_invalid_format_errors(self, project_dir):
from typer.testing import CliRunner
from unittest.mock import patch
from specify_cli import app
from specify_cli.workflows.engine import WorkflowDefinition
definition = WorkflowDefinition.from_string(self._WF_NUM)
state = self._engine(project_dir).execute(definition)
runner = CliRunner()
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(
app, ["workflow", "resume", state.run_id, "--input", "bogus"]
)
assert result.exit_code == 1
assert "Invalid input format" in result.stdout