"""Tests for the workflow engine subsystem. Covers: - Step registry & auto-discovery - Base classes (StepBase, StepContext, StepResult) - Expression engine - All 10 built-in step types - Workflow definition loading & validation - Workflow engine execution & state persistence - Workflow catalog & registry """ from __future__ import annotations import json import os import shutil import tempfile from pathlib import Path import pytest import yaml # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def temp_dir(): """Create a temporary directory for tests.""" tmpdir = tempfile.mkdtemp() yield Path(tmpdir) shutil.rmtree(tmpdir) @pytest.fixture def project_dir(temp_dir): """Create a mock spec-kit project with .specify/ directory.""" specify_dir = temp_dir / ".specify" specify_dir.mkdir() (specify_dir / "workflows").mkdir() return temp_dir @pytest.fixture def sample_workflow_yaml(): """Return a valid minimal workflow YAML string.""" return """ schema_version: "1.0" workflow: id: "test-workflow" name: "Test Workflow" version: "1.0.0" description: "A test workflow" inputs: spec: type: string required: true scope: type: string default: "full" steps: - id: step-one command: speckit.specify input: args: "{{ inputs.spec }}" - id: step-two command: speckit.plan input: args: "{{ steps.step-one.output.command }}" """ @pytest.fixture def sample_workflow_file(project_dir, sample_workflow_yaml): """Write a sample workflow YAML to a file and return its path.""" wf_dir = project_dir / ".specify" / "workflows" / "test-workflow" wf_dir.mkdir(parents=True, exist_ok=True) wf_path = wf_dir / "workflow.yml" wf_path.write_text(sample_workflow_yaml, encoding="utf-8") return wf_path # ===== Step Registry Tests ===== class TestStepRegistry: """Test STEP_REGISTRY and auto-discovery.""" def test_registry_populated(self): from specify_cli.workflows import STEP_REGISTRY assert len(STEP_REGISTRY) >= 10 def test_all_step_types_registered(self): from specify_cli.workflows import STEP_REGISTRY expected = { "command", "shell", "prompt", "gate", "if", "switch", "while", "do-while", "fan-out", "fan-in", } assert expected.issubset(set(STEP_REGISTRY.keys())) def test_get_step_type(self): from specify_cli.workflows import get_step_type step = get_step_type("command") assert step is not None assert step.type_key == "command" def test_get_step_type_missing(self): from specify_cli.workflows import get_step_type assert get_step_type("nonexistent") is None def test_register_step_duplicate_raises(self): from specify_cli.workflows import _register_step from specify_cli.workflows.steps.command import CommandStep with pytest.raises(KeyError, match="already registered"): _register_step(CommandStep()) def test_register_step_empty_key_raises(self): from specify_cli.workflows import _register_step from specify_cli.workflows.base import StepBase, StepResult class EmptyStep(StepBase): type_key = "" def execute(self, config, context): return StepResult() with pytest.raises(ValueError, match="empty type_key"): _register_step(EmptyStep()) # ===== Base Classes Tests ===== class TestBaseClasses: """Test StepBase, StepContext, StepResult.""" def test_step_context_defaults(self): from specify_cli.workflows.base import StepContext ctx = StepContext() assert ctx.inputs == {} assert ctx.steps == {} assert ctx.item is None assert ctx.fan_in == {} assert ctx.default_integration is None def test_step_context_with_data(self): from specify_cli.workflows.base import StepContext ctx = StepContext( inputs={"name": "test"}, default_integration="claude", default_model="sonnet-4", ) assert ctx.inputs == {"name": "test"} assert ctx.default_integration == "claude" assert ctx.default_model == "sonnet-4" def test_step_result_defaults(self): from specify_cli.workflows.base import StepResult, StepStatus result = StepResult() assert result.status == StepStatus.COMPLETED assert result.output == {} assert result.next_steps == [] assert result.error is None def test_step_status_values(self): from specify_cli.workflows.base import StepStatus assert StepStatus.PENDING == "pending" assert StepStatus.RUNNING == "running" assert StepStatus.COMPLETED == "completed" assert StepStatus.FAILED == "failed" assert StepStatus.SKIPPED == "skipped" assert StepStatus.PAUSED == "paused" def test_run_status_values(self): from specify_cli.workflows.base import RunStatus assert RunStatus.CREATED == "created" assert RunStatus.RUNNING == "running" assert RunStatus.PAUSED == "paused" assert RunStatus.COMPLETED == "completed" assert RunStatus.FAILED == "failed" assert RunStatus.ABORTED == "aborted" # ===== Expression Engine Tests ===== class TestExpressions: """Test sandboxed expression evaluator.""" def test_simple_variable(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"name": "login"}) assert evaluate_expression("{{ inputs.name }}", ctx) == "login" def test_step_output_reference(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext( steps={"specify": {"output": {"file": "spec.md"}}} ) assert evaluate_expression("{{ steps.specify.output.file }}", ctx) == "spec.md" def test_string_interpolation(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"name": "login"}) result = evaluate_expression("Feature: {{ inputs.name }} done", ctx) assert result == "Feature: login done" def test_comparison_equals(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"scope": "full"}) assert evaluate_expression("{{ inputs.scope == 'full' }}", ctx) is True assert evaluate_expression("{{ inputs.scope == 'partial' }}", ctx) is False def test_comparison_not_equals(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext( steps={"run-tests": {"output": {"exit_code": 1}}} ) result = evaluate_expression("{{ steps.run-tests.output.exit_code != 0 }}", ctx) assert result is True def test_numeric_comparison(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext( steps={"plan": {"output": {"task_count": 7}}} ) assert evaluate_expression("{{ steps.plan.output.task_count > 5 }}", ctx) is True assert evaluate_expression("{{ steps.plan.output.task_count < 5 }}", ctx) is False def test_boolean_and(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"a": True, "b": True}) assert evaluate_expression("{{ inputs.a and inputs.b }}", ctx) is True def test_boolean_or(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"a": False, "b": True}) assert evaluate_expression("{{ inputs.a or inputs.b }}", ctx) is True def test_filter_default(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext() assert evaluate_expression("{{ inputs.missing | default('fallback') }}", ctx) == "fallback" def test_filter_join(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"tags": ["a", "b", "c"]}) assert evaluate_expression("{{ inputs.tags | join(', ') }}", ctx) == "a, b, c" def test_filter_contains(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"text": "hello world"}) assert evaluate_expression("{{ inputs.text | contains('world') }}", ctx) is True def test_condition_evaluation(self): from specify_cli.workflows.expressions import evaluate_condition from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"ready": True}) assert evaluate_condition("{{ inputs.ready }}", ctx) is True assert evaluate_condition("{{ inputs.missing }}", ctx) is False def test_non_string_passthrough(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext() assert evaluate_expression(42, ctx) == 42 assert evaluate_expression(None, ctx) is None def test_string_literal(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext() assert evaluate_expression("{{ 'hello' }}", ctx) == "hello" def test_numeric_literal(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext() assert evaluate_expression("{{ 42 }}", ctx) == 42 def test_boolean_literal(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext() assert evaluate_expression("{{ true }}", ctx) is True assert evaluate_expression("{{ false }}", ctx) is False def test_list_indexing(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext( steps={"tasks": {"output": {"task_list": [{"file": "a.md"}, {"file": "b.md"}]}}} ) result = evaluate_expression("{{ steps.tasks.output.task_list[0].file }}", ctx) assert result == "a.md" def test_context_run_id_resolves(self): """``{{ context.run_id }}`` resolves to ``StepContext.run_id``. Locks the contract from issue #2590: workflow templates can reference the engine-assigned run id for telemetry, artifact metadata, or per-run scratch isolation. """ from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(run_id="a1b2c3d4") assert evaluate_expression("{{ context.run_id }}", ctx) == "a1b2c3d4" def test_context_run_id_defaults_to_empty_when_unset(self): """``{{ context.run_id }}`` resolves to ``""`` when no run is active (dry-run, validation, ad-hoc evaluator usage) rather than raising — workflows referencing the variable never error outside a run context. """ from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext # No run_id set on the context. ctx = StepContext() assert evaluate_expression("{{ context.run_id }}", ctx) == "" def test_context_run_id_string_interpolation(self): """Run id interpolates inside a larger template string — the common pattern for stamping shell commands and artifact paths with the run id. """ from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(run_id="deadbeef") result = evaluate_expression("RUN_ID={{ context.run_id }}", ctx) assert result == "RUN_ID=deadbeef" # ===== Integration Dispatch Tests ===== class TestBuildExecArgs: """Test build_exec_args for CLI-based integrations.""" def test_claude_exec_args(self): from specify_cli.integrations.claude import ClaudeIntegration impl = ClaudeIntegration() args = impl.build_exec_args("do stuff", model="sonnet-4") assert args[0] == "claude" assert args[1] == "-p" assert args[2] == "do stuff" assert "--model" in args assert "sonnet-4" in args assert "--output-format" in args def test_gemini_exec_args(self): from specify_cli.integrations.gemini import GeminiIntegration impl = GeminiIntegration() args = impl.build_exec_args("do stuff", model="gemini-2.5-pro") assert args[0] == "gemini" assert args[1] == "-p" assert "-m" in args assert "gemini-2.5-pro" in args def test_codex_exec_args(self): from specify_cli.integrations.codex import CodexIntegration impl = CodexIntegration() args = impl.build_exec_args("do stuff") assert args[0] == "codex" assert args[1] == "exec" assert args[2] == "do stuff" assert "--json" in args def test_copilot_exec_args(self, monkeypatch): monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False) monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False) from specify_cli.integrations.copilot import CopilotIntegration impl = CopilotIntegration() args = impl.build_exec_args("do stuff", model="claude-sonnet-4-20250514") expected_exec = "copilot.cmd" if os.name == "nt" else "copilot" assert args[0] == expected_exec assert "-p" in args assert "--yolo" in args assert "--model" in args def test_copilot_new_env_var_disables_yolo(self, monkeypatch): monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "0") monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False) from specify_cli.integrations.copilot import CopilotIntegration impl = CopilotIntegration() args = impl.build_exec_args("do stuff") assert "--yolo" not in args def test_copilot_deprecated_env_var_still_honoured(self, monkeypatch): monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False) monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0") import warnings from specify_cli.integrations.copilot import CopilotIntegration impl = CopilotIntegration() with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") args = impl.build_exec_args("do stuff") assert "--yolo" not in args assert any( "SPECKIT_ALLOW_ALL_TOOLS is deprecated" in str(x.message) and issubclass(x.category, UserWarning) for x in w ) def test_copilot_new_env_var_takes_precedence(self, monkeypatch): monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "1") monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0") from specify_cli.integrations.copilot import CopilotIntegration impl = CopilotIntegration() args = impl.build_exec_args("do stuff") assert "--yolo" in args def test_ide_only_returns_none(self): from specify_cli.integrations.windsurf import WindsurfIntegration impl = WindsurfIntegration() assert impl.build_exec_args("test") is None def test_no_model_omits_flag(self): from specify_cli.integrations.claude import ClaudeIntegration impl = ClaudeIntegration() args = impl.build_exec_args("do stuff", model=None) assert "--model" not in args def test_no_json_omits_flag(self): from specify_cli.integrations.claude import ClaudeIntegration impl = ClaudeIntegration() args = impl.build_exec_args("do stuff", output_json=False) assert "--output-format" not in args def test_rovodev_exec_args(self): from specify_cli.integrations.rovodev import RovodevIntegration impl = RovodevIntegration() args = impl.build_exec_args("/speckit.plan add OAuth") assert args[0:3] == ["acli", "rovodev", "run"] assert args[3] == "/speckit.plan add OAuth" assert "--output-schema" in args # ===== Step Type Tests ===== class TestCommandStep: """Test the command step type.""" def test_execute_basic(self): from unittest.mock import patch from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext, StepStatus step = CommandStep() ctx = StepContext( inputs={"name": "login"}, default_integration="claude", ) config = { "id": "test", "command": "speckit.specify", "input": {"args": "{{ inputs.name }}"}, } with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["command"] == "speckit.specify" assert result.output["integration"] == "claude" assert result.output["input"]["args"] == "login" def test_try_dispatch_resolves_rovodev_via_acli(self, tmp_path): """When acli is installed, rovodev dispatch succeeds via acli.""" from unittest.mock import patch, MagicMock from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext, StepStatus step = CommandStep() ctx = StepContext( default_integration="rovodev", project_root=str(tmp_path), ) config = { "id": "test", "command": "speckit.plan", "input": {"args": "add OAuth"}, } mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "" mock_result.stderr = "" with patch("specify_cli.workflows.steps.command.shutil.which", lambda name: "/usr/bin/acli" if name == "acli" else None), \ patch("subprocess.run", return_value=mock_result): result = step.execute(config, ctx) assert result.status == StepStatus.COMPLETED assert result.output["dispatched"] is True assert result.output["exit_code"] == 0 def test_validate_missing_command(self): from specify_cli.workflows.steps.command import CommandStep step = CommandStep() errors = step.validate({"id": "test"}) assert any("missing 'command'" in e for e in errors) def test_step_override_integration(self): from unittest.mock import patch from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext step = CommandStep() ctx = StepContext(default_integration="claude") config = { "id": "test", "command": "speckit.plan", "integration": "gemini", "input": {}, } with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.output["integration"] == "gemini" def test_step_override_model(self): from unittest.mock import patch from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext step = CommandStep() ctx = StepContext(default_model="sonnet-4") config = { "id": "test", "command": "speckit.implement", "model": "opus-4", "input": {}, } with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.output["model"] == "opus-4" def test_options_merge(self): from unittest.mock import patch from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext step = CommandStep() ctx = StepContext(default_options={"max-tokens": 8000}) config = { "id": "test", "command": "speckit.plan", "options": {"thinking-budget": 32768}, "input": {}, } with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.output["options"]["max-tokens"] == 8000 assert result.output["options"]["thinking-budget"] == 32768 def test_dispatch_not_attempted_without_cli(self): """When the CLI tool is not installed, step should fail.""" from unittest.mock import patch from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext, StepStatus step = CommandStep() ctx = StepContext( inputs={"name": "login"}, default_integration="claude", project_root="/tmp", ) config = { "id": "test", "command": "speckit.specify", "input": {"args": "{{ inputs.name }}"}, } with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["dispatched"] is False assert result.error is not None def test_dispatch_with_mock_cli(self, tmp_path, monkeypatch): """When the CLI is installed, dispatch invokes the command by name.""" from unittest.mock import patch, MagicMock from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext, StepStatus step = CommandStep() ctx = StepContext( inputs={"name": "login"}, default_integration="claude", project_root=str(tmp_path), ) config = { "id": "test", "command": "speckit.specify", "input": {"args": "{{ inputs.name }}"}, } mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = '{"result": "done"}' mock_result.stderr = "" with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \ patch("specify_cli.integrations.base.shutil.which", return_value="/usr/local/bin/claude"), \ patch("subprocess.run", return_value=mock_result) as mock_run: result = step.execute(config, ctx) assert result.status == StepStatus.COMPLETED assert result.output["dispatched"] is True assert result.output["exit_code"] == 0 # Verify the CLI was called with the resolved path (via shutil.which, # which honors PATHEXT for ``.cmd``/``.bat`` shims on Windows), then # ``-p`` and the skill invocation. call_args = mock_run.call_args assert call_args[0][0][0] == "/usr/local/bin/claude" assert call_args[0][0][1] == "-p" # Claude is a SkillsIntegration so uses /speckit-specify assert "/speckit-specify login" in call_args[0][0][2] def test_dispatch_failure_returns_failed_status(self, tmp_path): """When the CLI exits non-zero, the step should fail.""" from unittest.mock import patch, MagicMock from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext, StepStatus step = CommandStep() ctx = StepContext( inputs={}, default_integration="claude", project_root=str(tmp_path), ) config = { "id": "test", "command": "speckit.specify", "input": {"args": "test"}, } mock_result = MagicMock() mock_result.returncode = 1 mock_result.stdout = "" mock_result.stderr = "API error" with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \ patch("specify_cli.integrations.base.shutil.which", return_value="/usr/local/bin/claude"), \ patch("subprocess.run", return_value=mock_result): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["dispatched"] is True assert result.output["exit_code"] == 1 class TestPromptStep: """Test the prompt step type.""" def test_execute_basic(self): from unittest.mock import patch from specify_cli.workflows.steps.prompt import PromptStep from specify_cli.workflows.base import StepContext, StepStatus step = PromptStep() ctx = StepContext( inputs={"file": "auth.py"}, default_integration="claude", ) config = { "id": "review", "type": "prompt", "prompt": "Review {{ inputs.file }} for security issues", } with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["prompt"] == "Review auth.py for security issues" assert result.output["integration"] == "claude" assert result.output["dispatched"] is False def test_execute_with_step_integration(self): from unittest.mock import patch from specify_cli.workflows.steps.prompt import PromptStep from specify_cli.workflows.base import StepContext step = PromptStep() ctx = StepContext(default_integration="claude") config = { "id": "review", "type": "prompt", "prompt": "Summarize the codebase", "integration": "gemini", } with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.output["integration"] == "gemini" def test_execute_with_model(self): from unittest.mock import patch from specify_cli.workflows.steps.prompt import PromptStep from specify_cli.workflows.base import StepContext step = PromptStep() ctx = StepContext(default_integration="claude", default_model="sonnet-4") config = { "id": "review", "type": "prompt", "prompt": "hello", "model": "opus-4", } with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.output["model"] == "opus-4" def test_try_dispatch_resolves_rovodev_via_acli(self, tmp_path): """When acli is installed, rovodev prompt dispatch succeeds via acli.""" from unittest.mock import patch, MagicMock from specify_cli.workflows.steps.prompt import PromptStep from specify_cli.workflows.base import StepContext, StepStatus step = PromptStep() ctx = StepContext( default_integration="rovodev", project_root=str(tmp_path), ) config = { "id": "test", "type": "prompt", "prompt": "Explain this code", } mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "" mock_result.stderr = "" with patch("specify_cli.workflows.steps.prompt.shutil.which", lambda name: "/usr/bin/acli" if name == "acli" else None), \ patch("subprocess.run", return_value=mock_result): result = step.execute(config, ctx) assert result.status == StepStatus.COMPLETED assert result.output["dispatched"] is True assert result.output["exit_code"] == 0 def test_dispatch_with_mock_cli(self, tmp_path): from unittest.mock import patch, MagicMock from specify_cli.workflows.steps.prompt import PromptStep from specify_cli.workflows.base import StepContext, StepStatus step = PromptStep() ctx = StepContext( default_integration="claude", project_root=str(tmp_path), ) config = { "id": "ask", "type": "prompt", "prompt": "Explain this code", } mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "Here is the explanation" mock_result.stderr = "" with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value="/usr/local/bin/claude"), \ patch("subprocess.run", return_value=mock_result): result = step.execute(config, ctx) assert result.status == StepStatus.COMPLETED assert result.output["dispatched"] is True assert result.output["exit_code"] == 0 def test_validate_missing_prompt(self): from specify_cli.workflows.steps.prompt import PromptStep step = PromptStep() errors = step.validate({"id": "test"}) assert any("missing 'prompt'" in e for e in errors) def test_validate_valid(self): from specify_cli.workflows.steps.prompt import PromptStep step = PromptStep() errors = step.validate({"id": "test", "prompt": "do something"}) assert errors == [] class TestShellStep: """Test the shell step type.""" def test_execute_echo(self): from specify_cli.workflows.steps.shell import ShellStep from specify_cli.workflows.base import StepContext, StepStatus step = ShellStep() ctx = StepContext() config = {"id": "test", "run": "echo hello"} result = step.execute(config, ctx) assert result.status == StepStatus.COMPLETED assert result.output["exit_code"] == 0 assert "hello" in result.output["stdout"] def test_execute_failure(self): from specify_cli.workflows.steps.shell import ShellStep from specify_cli.workflows.base import StepContext, StepStatus step = ShellStep() ctx = StepContext() config = {"id": "test", "run": "exit 1"} result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["exit_code"] == 1 assert result.error is not None def test_validate_missing_run(self): from specify_cli.workflows.steps.shell import ShellStep step = ShellStep() errors = step.validate({"id": "test"}) assert any("missing 'run'" in e for e in errors) class TestGateStep: """Test the gate step type.""" def test_execute_returns_paused(self): from specify_cli.workflows.steps.gate import GateStep from specify_cli.workflows.base import StepContext, StepStatus step = GateStep() ctx = StepContext() config = { "id": "review", "message": "Review the spec.", "options": ["approve", "reject"], "on_reject": "abort", } result = step.execute(config, ctx) assert result.status == StepStatus.PAUSED assert result.output["message"] == "Review the spec." assert result.output["options"] == ["approve", "reject"] def test_validate_missing_message(self): from specify_cli.workflows.steps.gate import GateStep step = GateStep() errors = step.validate({"id": "test", "options": ["approve"]}) assert any("missing 'message'" in e for e in errors) def test_validate_invalid_on_reject(self): from specify_cli.workflows.steps.gate import GateStep step = GateStep() errors = step.validate({ "id": "test", "message": "Review", "on_reject": "invalid", }) assert any("on_reject" in e for e in errors) class TestIfThenStep: """Test the if/then/else step type.""" def test_execute_then_branch(self): from specify_cli.workflows.steps.if_then import IfThenStep from specify_cli.workflows.base import StepContext step = IfThenStep() ctx = StepContext(inputs={"scope": "full"}) config = { "id": "check", "condition": "{{ inputs.scope == 'full' }}", "then": [{"id": "a", "command": "speckit.tasks"}], "else": [{"id": "b", "command": "speckit.plan"}], } result = step.execute(config, ctx) assert result.output["condition_result"] is True assert len(result.next_steps) == 1 assert result.next_steps[0]["id"] == "a" def test_execute_else_branch(self): from specify_cli.workflows.steps.if_then import IfThenStep from specify_cli.workflows.base import StepContext step = IfThenStep() ctx = StepContext(inputs={"scope": "backend"}) config = { "id": "check", "condition": "{{ inputs.scope == 'full' }}", "then": [{"id": "a", "command": "speckit.tasks"}], "else": [{"id": "b", "command": "speckit.plan"}], } result = step.execute(config, ctx) assert result.output["condition_result"] is False assert result.next_steps[0]["id"] == "b" def test_validate_missing_condition(self): from specify_cli.workflows.steps.if_then import IfThenStep step = IfThenStep() errors = step.validate({"id": "test", "then": []}) assert any("missing 'condition'" in e for e in errors) class TestSwitchStep: """Test the switch step type.""" def test_execute_matches_case(self): from specify_cli.workflows.steps.switch import SwitchStep from specify_cli.workflows.base import StepContext step = SwitchStep() ctx = StepContext( steps={"review": {"output": {"choice": "approve"}}} ) config = { "id": "route", "expression": "{{ steps.review.output.choice }}", "cases": { "approve": [{"id": "plan", "command": "speckit.plan"}], "reject": [{"id": "log", "type": "shell", "run": "echo rejected"}], }, "default": [{"id": "abort", "type": "gate", "message": "Unknown"}], } result = step.execute(config, ctx) assert result.output["matched_case"] == "approve" assert result.next_steps[0]["id"] == "plan" def test_execute_falls_to_default(self): from specify_cli.workflows.steps.switch import SwitchStep from specify_cli.workflows.base import StepContext step = SwitchStep() ctx = StepContext( steps={"review": {"output": {"choice": "unknown"}}} ) config = { "id": "route", "expression": "{{ steps.review.output.choice }}", "cases": { "approve": [{"id": "plan", "command": "speckit.plan"}], }, "default": [{"id": "fallback", "type": "gate", "message": "Fallback"}], } result = step.execute(config, ctx) assert result.output["matched_case"] == "__default__" assert result.next_steps[0]["id"] == "fallback" def test_execute_no_default_no_match(self): from specify_cli.workflows.steps.switch import SwitchStep from specify_cli.workflows.base import StepContext step = SwitchStep() ctx = StepContext( steps={"review": {"output": {"choice": "other"}}} ) config = { "id": "route", "expression": "{{ steps.review.output.choice }}", "cases": { "approve": [{"id": "plan", "command": "speckit.plan"}], }, } result = step.execute(config, ctx) assert result.output["matched_case"] == "__default__" assert result.next_steps == [] def test_validate_missing_expression(self): from specify_cli.workflows.steps.switch import SwitchStep step = SwitchStep() errors = step.validate({"id": "test", "cases": {}}) assert any("missing 'expression'" in e for e in errors) def test_validate_invalid_cases_and_default(self): from specify_cli.workflows.steps.switch import SwitchStep step = SwitchStep() errors = step.validate({ "id": "test", "expression": "{{ x }}", "cases": {"a": "not-a-list"}, "default": "also-bad", }) assert any("case 'a' must be a list" in e for e in errors) assert any("'default' must be a list" in e for e in errors) class TestWhileStep: """Test the while loop step type.""" def test_execute_condition_true(self): from specify_cli.workflows.steps.while_loop import WhileStep from specify_cli.workflows.base import StepContext step = WhileStep() ctx = StepContext( steps={"run-tests": {"output": {"exit_code": 1}}} ) config = { "id": "retry", "condition": "{{ steps.run-tests.output.exit_code != 0 }}", "max_iterations": 5, "steps": [{"id": "fix", "command": "speckit.implement"}], } result = step.execute(config, ctx) assert result.output["condition_result"] is True assert len(result.next_steps) == 1 def test_execute_condition_false(self): from specify_cli.workflows.steps.while_loop import WhileStep from specify_cli.workflows.base import StepContext step = WhileStep() ctx = StepContext( steps={"run-tests": {"output": {"exit_code": 0}}} ) config = { "id": "retry", "condition": "{{ steps.run-tests.output.exit_code != 0 }}", "max_iterations": 5, "steps": [{"id": "fix", "command": "speckit.implement"}], } result = step.execute(config, ctx) assert result.output["condition_result"] is False assert result.next_steps == [] def test_validate_missing_fields(self): from specify_cli.workflows.steps.while_loop import WhileStep step = WhileStep() errors = step.validate({"id": "test", "steps": []}) assert any("missing 'condition'" in e for e in errors) # max_iterations is optional (defaults to 10) def test_validate_invalid_max_iterations(self): from specify_cli.workflows.steps.while_loop import WhileStep step = WhileStep() errors = step.validate({"id": "test", "condition": "{{ true }}", "max_iterations": 0, "steps": []}) assert any("must be an integer >= 1" in e for e in errors) class TestDoWhileStep: """Test the do-while loop step type.""" def test_execute_always_runs_once(self): from specify_cli.workflows.steps.do_while import DoWhileStep from specify_cli.workflows.base import StepContext step = DoWhileStep() ctx = StepContext() config = { "id": "cycle", "condition": "{{ false }}", "max_iterations": 3, "steps": [{"id": "refine", "command": "speckit.specify"}], } result = step.execute(config, ctx) assert len(result.next_steps) == 1 assert result.output["loop_type"] == "do-while" assert result.output["condition"] == "{{ false }}" def test_execute_with_true_condition(self): from specify_cli.workflows.steps.do_while import DoWhileStep from specify_cli.workflows.base import StepContext step = DoWhileStep() ctx = StepContext() config = { "id": "cycle", "condition": "{{ true }}", "max_iterations": 5, "steps": [{"id": "work", "command": "speckit.plan"}], } result = step.execute(config, ctx) # Body always executes on first call regardless of condition assert len(result.next_steps) == 1 assert result.output["max_iterations"] == 5 def test_execute_empty_steps(self): from specify_cli.workflows.steps.do_while import DoWhileStep from specify_cli.workflows.base import StepContext step = DoWhileStep() ctx = StepContext() config = { "id": "empty", "condition": "{{ false }}", "max_iterations": 1, "steps": [], } result = step.execute(config, ctx) assert result.next_steps == [] assert result.status.value == "completed" def test_validate_missing_fields(self): from specify_cli.workflows.steps.do_while import DoWhileStep step = DoWhileStep() errors = step.validate({"id": "test", "steps": []}) assert any("missing 'condition'" in e for e in errors) # max_iterations is optional (defaults to 10) def test_validate_steps_not_list(self): from specify_cli.workflows.steps.do_while import DoWhileStep step = DoWhileStep() errors = step.validate({ "id": "test", "condition": "{{ true }}", "max_iterations": 3, "steps": "not-a-list", }) assert any("'steps' must be a list" in e for e in errors) class TestFanOutStep: """Test the fan-out step type.""" def test_execute_with_items(self): from specify_cli.workflows.steps.fan_out import FanOutStep from specify_cli.workflows.base import StepContext step = FanOutStep() ctx = StepContext( steps={"tasks": {"output": {"task_list": [ {"file": "a.md"}, {"file": "b.md"}, ]}}} ) config = { "id": "parallel", "items": "{{ steps.tasks.output.task_list }}", "max_concurrency": 3, "step": {"id": "impl", "command": "speckit.implement"}, } result = step.execute(config, ctx) assert result.output["item_count"] == 2 assert result.output["max_concurrency"] == 3 def test_execute_non_list_items_resolves_empty(self): from specify_cli.workflows.steps.fan_out import FanOutStep from specify_cli.workflows.base import StepContext step = FanOutStep() ctx = StepContext() config = { "id": "parallel", "items": "{{ undefined_var }}", "step": {"id": "impl", "command": "speckit.implement"}, } result = step.execute(config, ctx) assert result.output["item_count"] == 0 assert result.output["items"] == [] def test_validate_missing_fields(self): from specify_cli.workflows.steps.fan_out import FanOutStep step = FanOutStep() errors = step.validate({"id": "test"}) assert any("missing 'items'" in e for e in errors) assert any("missing 'step'" in e for e in errors) def test_validate_step_not_mapping(self): from specify_cli.workflows.steps.fan_out import FanOutStep step = FanOutStep() errors = step.validate({ "id": "test", "items": "{{ x }}", "step": "not-a-dict", }) assert any("'step' must be a mapping" in e for e in errors) class TestFanInStep: """Test the fan-in step type.""" def test_execute_collects_results(self): from specify_cli.workflows.steps.fan_in import FanInStep from specify_cli.workflows.base import StepContext step = FanInStep() ctx = StepContext( steps={ "parallel": {"output": {"item_count": 2, "status": "done"}} } ) config = { "id": "collect", "wait_for": ["parallel"], "output": {}, } result = step.execute(config, ctx) assert len(result.output["results"]) == 1 assert result.output["results"][0]["item_count"] == 2 def test_execute_multiple_wait_for(self): from specify_cli.workflows.steps.fan_in import FanInStep from specify_cli.workflows.base import StepContext step = FanInStep() ctx = StepContext( steps={ "task-a": {"output": {"file": "a.md"}}, "task-b": {"output": {"file": "b.md"}}, } ) config = { "id": "collect", "wait_for": ["task-a", "task-b"], "output": {}, } result = step.execute(config, ctx) assert len(result.output["results"]) == 2 assert result.output["results"][0]["file"] == "a.md" assert result.output["results"][1]["file"] == "b.md" def test_execute_missing_wait_for_step(self): from specify_cli.workflows.steps.fan_in import FanInStep from specify_cli.workflows.base import StepContext step = FanInStep() ctx = StepContext(steps={}) config = { "id": "collect", "wait_for": ["nonexistent"], "output": {}, } result = step.execute(config, ctx) assert result.output["results"] == [{}] def test_validate_empty_wait_for(self): from specify_cli.workflows.steps.fan_in import FanInStep step = FanInStep() errors = step.validate({"id": "test", "wait_for": []}) assert any("non-empty list" in e for e in errors) def test_validate_wait_for_not_list(self): from specify_cli.workflows.steps.fan_in import FanInStep step = FanInStep() errors = step.validate({"id": "test", "wait_for": "not-a-list"}) assert any("non-empty list" in e for e in errors) # ===== Workflow Definition Tests ===== class TestWorkflowDefinition: """Test WorkflowDefinition loading and parsing.""" def test_from_yaml(self, sample_workflow_file): from specify_cli.workflows.engine import WorkflowDefinition definition = WorkflowDefinition.from_yaml(sample_workflow_file) assert definition.id == "test-workflow" assert definition.name == "Test Workflow" assert definition.version == "1.0.0" assert len(definition.steps) == 2 def test_from_string(self, sample_workflow_yaml): from specify_cli.workflows.engine import WorkflowDefinition definition = WorkflowDefinition.from_string(sample_workflow_yaml) assert definition.id == "test-workflow" assert len(definition.inputs) == 2 def test_from_string_invalid(self): from specify_cli.workflows.engine import WorkflowDefinition with pytest.raises(ValueError, match="must be a mapping"): WorkflowDefinition.from_string("- just a list") def test_inputs_parsed(self, sample_workflow_yaml): from specify_cli.workflows.engine import WorkflowDefinition definition = WorkflowDefinition.from_string(sample_workflow_yaml) assert "spec" in definition.inputs assert definition.inputs["spec"]["required"] is True assert definition.inputs["scope"]["default"] == "full" # ===== Workflow Validation Tests ===== class TestWorkflowValidation: """Test workflow validation.""" def test_valid_workflow(self, sample_workflow_yaml): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(sample_workflow_yaml) errors = validate_workflow(definition) assert errors == [] def test_missing_id(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: name: "Test" version: "1.0.0" steps: - id: step-one command: speckit.specify """) errors = validate_workflow(definition) assert any("workflow.id" in e for e in errors) def test_invalid_id_format(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "Invalid ID!" name: "Test" version: "1.0.0" steps: - id: step-one command: speckit.specify """) errors = validate_workflow(definition) assert any("lowercase alphanumeric" in e for e in errors) def test_no_steps(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "test" name: "Test" version: "1.0.0" steps: [] """) errors = validate_workflow(definition) assert any("no steps" in e.lower() for e in errors) def test_duplicate_step_ids(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "test" name: "Test" version: "1.0.0" steps: - id: same-id command: speckit.specify - id: same-id command: speckit.plan """) errors = validate_workflow(definition) assert any("Duplicate" in e for e in errors) def test_invalid_step_type(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "test" name: "Test" version: "1.0.0" steps: - id: bad type: nonexistent """) errors = validate_workflow(definition) assert any("invalid type" in e.lower() for e in errors) def test_nested_step_validation(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "test" name: "Test" version: "1.0.0" steps: - id: branch type: if condition: "{{ true }}" then: - id: nested-a command: speckit.specify else: - id: nested-b command: speckit.plan """) errors = validate_workflow(definition) assert errors == [] def test_invalid_input_type(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "test" name: "Test" version: "1.0.0" inputs: bad: type: array steps: - id: step-one command: speckit.specify """) errors = validate_workflow(definition) assert any("invalid type" in e.lower() for e in errors) # ===== Workflow Engine Tests ===== class TestWorkflowEngine: """Test WorkflowEngine execution.""" def test_load_from_file(self, sample_workflow_file, project_dir): from specify_cli.workflows.engine import WorkflowEngine engine = WorkflowEngine(project_dir) definition = engine.load_workflow(str(sample_workflow_file)) assert definition.id == "test-workflow" def test_load_from_installed_id(self, sample_workflow_file, project_dir): from specify_cli.workflows.engine import WorkflowEngine engine = WorkflowEngine(project_dir) definition = engine.load_workflow("test-workflow") assert definition.id == "test-workflow" def test_load_not_found(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine engine = WorkflowEngine(project_dir) with pytest.raises(FileNotFoundError): engine.load_workflow("nonexistent") def test_execute_simple_workflow(self, project_dir): from unittest.mock import patch from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "simple" name: "Simple" version: "1.0.0" integration: claude inputs: name: type: string default: "test" steps: - id: step-one command: speckit.specify input: args: "{{ inputs.name }}" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): state = engine.execute(definition, {"name": "login"}) assert state.status == RunStatus.FAILED assert "step-one" in state.step_results assert state.step_results["step-one"]["output"]["command"] == "speckit.specify" assert state.step_results["step-one"]["output"]["input"]["args"] == "login" def test_execute_with_gate_pauses(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "gated" name: "Gated" version: "1.0.0" steps: - id: step-one type: shell run: "echo test" - id: gate type: gate message: "Review?" options: [approve, reject] on_reject: abort - id: step-two type: shell run: "echo done" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.PAUSED assert "gate" in state.step_results assert state.step_results["gate"]["status"] == "paused" def test_execute_with_shell_step(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "shell-test" name: "Shell Test" version: "1.0.0" steps: - id: echo type: shell run: "echo workflow-output" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert "workflow-output" in state.step_results["echo"]["output"]["stdout"] def test_execute_with_if_then(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "branching" name: "Branching" version: "1.0.0" inputs: scope: type: string default: "full" steps: - id: check type: if condition: "{{ inputs.scope == 'full' }}" then: - id: full-tasks type: shell run: "echo full" else: - id: partial-tasks type: shell run: "echo partial" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition, {"scope": "full"}) assert state.status == RunStatus.COMPLETED assert "full-tasks" in state.step_results assert "partial-tasks" not in state.step_results def test_execute_missing_required_input(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition yaml_str = """ schema_version: "1.0" workflow: id: "needs-input" name: "Needs Input" version: "1.0.0" inputs: name: type: string required: true steps: - id: step-one command: speckit.specify input: args: "{{ inputs.name }}" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) with pytest.raises(ValueError, match="Required input"): engine.execute(definition, {}) def test_integration_auto_default_uses_project_integration(self, project_dir): """`integration: auto` should resolve to .specify/integration.json's integration.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text( json.dumps({"integration": "opencode", "version": "0.7.4"}), encoding="utf-8", ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-default" name: "Auto Default" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "opencode" def test_integration_auto_default_falls_back_when_no_integration_json(self, project_dir): """`integration: auto` should keep the literal "auto" when project state is missing. The engine itself must not invent an integration when ``.specify/integration.json`` is absent; any later validation or command resolution will handle an unresolved ``"auto"`` value. """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-fallback" name: "Auto Fallback" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "auto" def test_integration_explicit_input_overrides_auto(self, project_dir): """An explicit --input integration=X must win over `auto` even when integration.json exists.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text( json.dumps({"integration": "opencode"}), encoding="utf-8", ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "explicit-wins" name: "Explicit Wins" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {"integration": "claude"}) assert resolved["integration"] == "claude" def test_integration_explicit_auto_resolves_like_default(self, project_dir): """Passing ``integration=auto`` explicitly must resolve the sentinel, not pass it through as a literal — the workflow prompt advertises ``auto`` as a valid value, so the dispatch path must never see it. """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text( json.dumps({"integration": "opencode"}), encoding="utf-8", ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "explicit-auto" name: "Explicit Auto" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {"integration": "auto"}) assert resolved["integration"] == "opencode" def test_integration_auto_ignores_malformed_integration_json(self, project_dir): """A malformed integration.json must not crash — fall back to the literal default.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text("{not json", encoding="utf-8") definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-malformed" name: "Auto Malformed" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "auto" def test_integration_auto_ignores_non_utf8_integration_json(self, project_dir): """A non-UTF8 integration.json must not crash — fall back to the literal default.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) # 0xFF is invalid as the leading byte of a UTF-8 sequence, so # ``Path.read_text(encoding="utf-8")`` raises UnicodeDecodeError. (specify_dir / "integration.json").write_bytes(b"\xff\xfe\x00\x00") definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-non-utf8" name: "Auto Non UTF-8" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "auto" def test_integration_auto_resolves_modern_normalized_state(self, project_dir): """`integration: auto` must resolve modern state files that record ``default_integration`` / ``installed_integrations`` and omit the legacy ``integration`` field.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text( json.dumps( { "version": "0.8.3", "integration_state_schema": 1, "default_integration": "claude", "installed_integrations": ["claude", "copilot"], "integration_settings": {}, } ), encoding="utf-8", ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-modern" name: "Auto Modern" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "claude" def test_integration_auto_rejects_future_state_schema(self, project_dir): """`integration: auto` must not silently use a state file written by a newer CLI (``integration_state_schema`` greater than the current supported value); the resolver falls back to the literal default rather than guessing.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.integration_state import INTEGRATION_STATE_SCHEMA specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text( json.dumps( { "version": "99.0.0", "integration_state_schema": INTEGRATION_STATE_SCHEMA + 1, "default_integration": "claude", "installed_integrations": ["claude"], "integration_settings": {}, } ), encoding="utf-8", ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-future-schema" name: "Auto Future Schema" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "auto" def test_default_value_is_validated_against_enum(self, project_dir): """Defaults must run through the same coercion/enum check as provided inputs.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "default-enum" name: "Default Enum" version: "1.0.0" inputs: scope: type: string default: "not-in-enum" enum: ["full", "backend-only", "frontend-only"] """) engine = WorkflowEngine(project_dir) with pytest.raises(ValueError, match="not in allowed values"): engine._resolve_inputs(definition, {}) def test_default_value_is_coerced_to_declared_type(self, project_dir): """A numeric default declared as a string should still be coerced like a provided input.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "default-coerce" name: "Default Coerce" version: "1.0.0" inputs: retries: type: number default: "3" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["retries"] == 3 assert isinstance(resolved["retries"], int) def test_validate_workflow_rejects_invalid_default(self): """Authoring-time validation should reject defaults that violate enum.""" from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "bad-default" name: "Bad Default" version: "1.0.0" inputs: scope: type: string default: "not-in-enum" enum: ["full", "backend-only", "frontend-only"] steps: - id: noop type: gate message: "noop" options: [approve] """) errors = validate_workflow(definition) assert any("invalid default" in e for e in errors), errors def test_validate_workflow_exempts_integration_auto_sentinel(self): """``integration: auto`` is a runtime-resolved sentinel and must not fail validation.""" from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-ok" name: "Auto OK" version: "1.0.0" inputs: integration: type: string default: "auto" enum: ["copilot", "claude", "gemini"] steps: - id: noop type: gate message: "noop" options: [approve] """) errors = validate_workflow(definition) assert not any("invalid default" in e for e in errors), errors def test_validate_workflow_still_checks_type_for_auto_sentinel(self): """The ``auto`` exemption only skips enum-membership; declared type is still enforced.""" from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-bad-type" name: "Auto Bad Type" version: "1.0.0" inputs: integration: type: number default: "auto" steps: - id: noop type: gate message: "noop" options: [approve] """) errors = validate_workflow(definition) assert any("invalid default" in e for e in errors), errors def test_validate_workflow_rejects_bool_default_for_number_type(self): """``type: number`` paired with a bool default must fail — bool is a subclass of int so ``float(True)`` would otherwise silently coerce ``true`` to ``1``. """ from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "bool-as-number" name: "Bool As Number" version: "1.0.0" inputs: count: type: number default: true steps: - id: noop type: gate message: "noop" options: [approve] """) errors = validate_workflow(definition) assert any("invalid default" in e for e in errors), errors def test_validate_workflow_rejects_non_string_default_for_string_type(self): """``type: string`` must require an actual string — a numeric YAML default like ``5`` would otherwise slip through unvalidated. """ from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "number-as-string" name: "Number As String" version: "1.0.0" inputs: label: type: string default: 5 steps: - id: noop type: gate message: "noop" options: [approve] """) errors = validate_workflow(definition) assert any("invalid default" in e for e in errors), errors def test_while_loop_condition_reads_latest_iteration(self, project_dir): """Regression: while-loop condition must see updated step output from the most recent iteration, not stale iteration-0 data. See https://github.com/github/spec-kit/issues/2592 """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus # Shell step echoes a counter via a file. # Condition: exit_code != 0 means "keep looping" — but a non-zero # exit code would mark the step FAILED and abort the run, so we # use stdout-based comparison instead. # # Iteration 0: counter=1, echoes "1" → not "done" → loop continues # Iteration 1: counter=2, echoes "done" → condition false → stop # Without the fix, condition always reads iteration-0 stdout, # so the loop runs all max_iterations. import sys counter_file = project_dir / ".counter" counter_file.write_text("0", encoding="utf-8") py = sys.executable script_file = project_dir / "_tick.py" script_file.write_text( f"import pathlib; p = pathlib.Path(r'{counter_file}')\n" "n = int(p.read_text()) + 1; p.write_text(str(n))\n" "print('done' if n >= 2 else str(n), end='')\n", encoding="utf-8", ) yaml_str = f""" schema_version: "1.0" workflow: id: "while-condition-update" name: "While Condition Update" version: "1.0.0" steps: - id: retry-loop type: while condition: "{{{{ 'done' not in steps.attempt.output.stdout }}}}" max_iterations: 5 steps: - id: attempt type: shell run: '"{py}" "{script_file}"' """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED # The unprefixed key should reflect the latest iteration's result. assert state.step_results["attempt"]["output"]["stdout"] == "done" # Namespaced iteration-1 result should also exist. assert "retry-loop:attempt:1" in state.step_results # Counter should be 2 (iteration 0 + iteration 1), not 5. assert counter_file.read_text(encoding="utf-8").strip() == "2" def test_do_while_loop_condition_reads_latest_iteration(self, project_dir): """Regression: do-while loop condition must also see updated output. See https://github.com/github/spec-kit/issues/2592 """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus import sys counter_file = project_dir / ".counter" counter_file.write_text("0", encoding="utf-8") py = sys.executable script_file = project_dir / "_tick.py" script_file.write_text( f"import pathlib; p = pathlib.Path(r'{counter_file}')\n" "n = int(p.read_text()) + 1; p.write_text(str(n))\n" "print('done' if n >= 2 else str(n), end='')\n", encoding="utf-8", ) yaml_str = f""" schema_version: "1.0" workflow: id: "do-while-condition-update" name: "Do While Condition Update" version: "1.0.0" steps: - id: retry-loop type: do-while condition: "{{{{ 'done' not in steps.attempt.output.stdout }}}}" max_iterations: 5 steps: - id: attempt type: shell run: '"{py}" "{script_file}"' """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert state.step_results["attempt"]["output"]["stdout"] == "done" assert counter_file.read_text(encoding="utf-8").strip() == "2" def test_while_loop_runs_to_max_when_condition_stays_true(self, project_dir): """While loop must still run to max_iterations when the condition never becomes false — copy-back must not break this path. See https://github.com/github/spec-kit/issues/2592 """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus import sys counter_file = project_dir / ".counter" counter_file.write_text("0", encoding="utf-8") py = sys.executable script_file = project_dir / "_tick.py" script_file.write_text( f"import pathlib; p = pathlib.Path(r'{counter_file}')\n" "n = int(p.read_text()) + 1; p.write_text(str(n))\n" "print('pending', end='')\n", encoding="utf-8", ) yaml_str = f""" schema_version: "1.0" workflow: id: "while-max-iterations" name: "While Max Iterations" version: "1.0.0" steps: - id: retry-loop type: while condition: "{{{{ 'done' not in steps.tick.output.stdout }}}}" max_iterations: 3 steps: - id: tick type: shell run: '"{py}" "{script_file}"' """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED # All 3 iterations ran (iteration 0 + 2 loop iterations). assert counter_file.read_text(encoding="utf-8").strip() == "3" # Unprefixed key holds the last iteration's result. assert state.step_results["tick"]["output"]["stdout"] == "pending" # Namespaced keys for loop iterations exist. assert "retry-loop:tick:1" in state.step_results assert "retry-loop:tick:2" in state.step_results def test_do_while_loop_runs_to_max_when_condition_stays_true(self, project_dir): """Do-while loop must still run to max_iterations when the condition never becomes false. See https://github.com/github/spec-kit/issues/2592 """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus import sys counter_file = project_dir / ".counter" counter_file.write_text("0", encoding="utf-8") py = sys.executable script_file = project_dir / "_tick.py" script_file.write_text( f"import pathlib; p = pathlib.Path(r'{counter_file}')\n" "n = int(p.read_text()) + 1; p.write_text(str(n))\n" "print('pending', end='')\n", encoding="utf-8", ) yaml_str = f""" schema_version: "1.0" workflow: id: "do-while-max-iterations" name: "Do While Max Iterations" version: "1.0.0" steps: - id: retry-loop type: do-while condition: "{{{{ 'done' not in steps.tick.output.stdout }}}}" max_iterations: 3 steps: - id: tick type: shell run: '"{py}" "{script_file}"' """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert counter_file.read_text(encoding="utf-8").strip() == "3" assert state.step_results["tick"]["output"]["stdout"] == "pending" def test_while_loop_multi_step_body_inter_step_refs(self, project_dir): """Multi-step loop body: step B must see step A's output from the current iteration, not a stale previous one. See https://github.com/github/spec-kit/issues/2592 """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus import sys counter_file = project_dir / ".counter" counter_file.write_text("0", encoding="utf-8") py = sys.executable # Step A: increments counter file, echoes the value. step_a_file = project_dir / "_step_a.py" step_a_file.write_text( f"import pathlib; p = pathlib.Path(r'{counter_file}')\n" "n = int(p.read_text()) + 1; p.write_text(str(n))\n" "print(str(n), end='')\n", encoding="utf-8", ) # Step B uses {{ steps.step-a.output.stdout }} expression # substitution in its run command so the engine resolves the # aliased unprefixed key — this is the real inter-step test. yaml_str = f""" schema_version: "1.0" workflow: id: "while-multi-step" name: "While Multi Step" version: "1.0.0" steps: - id: retry-loop type: while condition: "{{{{ 'done' not in steps.step-a.output.stdout }}}}" max_iterations: 3 steps: - id: step-a type: shell run: '"{py}" "{step_a_file}"' - id: step-b type: shell run: "echo b-saw-{{{{ steps.step-a.output.stdout }}}}" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED # Both unprefixed keys reflect the latest iteration's results. assert state.step_results["step-a"]["output"]["stdout"] == "3" # Step B saw step A's output via expression substitution. assert "b-saw-3" in state.step_results["step-b"]["output"]["stdout"] # Namespaced keys exist for loop iterations. assert "retry-loop:step-a:1" in state.step_results assert "retry-loop:step-b:1" in state.step_results assert "retry-loop:step-a:2" in state.step_results assert "retry-loop:step-b:2" in state.step_results # ===== context.run_id Tests ===== # # End-to-end coverage for the `{{ context.run_id }}` template # variable introduced in issue #2590. Locks resolution inside the # three step types the acceptance criteria called out — shell `run:`, # command `input.args:`, and switch `expression:` — plus the # "workflow doesn't reference it" backward-compat path. class TestContextRunId: """End-to-end tests for `{{ context.run_id }}` in workflow YAML.""" def test_shell_run_resolves_run_id(self, project_dir): """`run: "echo {{ context.run_id }}"` substitutes the engine-assigned run id into the spawned shell, and the same value appears on `state.run_id`. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "stamp-run-id" name: "Stamp Run Id" version: "1.0.0" steps: - id: stamp type: shell run: "echo RUN_ID={{ context.run_id }}" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition, run_id="abc12345") assert state.run_id == "abc12345" stdout = state.step_results["stamp"]["output"]["stdout"] assert stdout.strip() == "RUN_ID=abc12345" def test_command_input_args_resolves_run_id(self, project_dir): """`input.args: "{{ context.run_id }}"` is resolved by `CommandStep` and recorded in step output, even when CLI dispatch is unavailable (no integration installed). Covers the artifact-metadata use case from the issue. """ from unittest.mock import patch from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "command-stamp" name: "Command Stamp" version: "1.0.0" integration: claude steps: - id: tag-artifact command: speckit.specify input: args: "{{ context.run_id }}" """) engine = WorkflowEngine(project_dir) with patch( "specify_cli.workflows.steps.command.shutil.which", return_value=None, ): state = engine.execute(definition, run_id="cafef00d") # Even when dispatch fails (no CLI), the resolved input is # recorded so downstream observers see the run id in artifact # metadata. assert state.step_results["tag-artifact"]["output"]["input"]["args"] == "cafef00d" def test_switch_expression_matches_on_run_id(self, project_dir): """`switch` over `{{ context.run_id }}` matches against case keys, and the nested branch can ALSO reference `{{ context.run_id }}`. Demonstrates the run id is a first-class value in the expression engine (not just a string-interpolation token) AND that it propagates into nested step execution via the recursive `_execute_steps` traversal. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "switch-on-run-id" name: "Switch On Run Id" version: "1.0.0" steps: - id: route type: switch expression: "{{ context.run_id }}" cases: target-run: - id: matched-branch type: shell run: "echo nested-run-id={{ context.run_id }}" default: - id: default-branch type: shell run: "echo defaulted" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition, run_id="target-run") assert state.status == RunStatus.COMPLETED assert state.step_results["route"]["output"]["matched_case"] == "target-run" assert "matched-branch" in state.step_results assert "default-branch" not in state.step_results # The nested branch sees the same run id — propagation through # recursive `_execute_steps` is intact. nested_stdout = state.step_results["matched-branch"]["output"]["stdout"] assert nested_stdout.strip() == "nested-run-id=target-run" def test_workflow_without_context_reference_unchanged(self, project_dir): """Workflows that do not reference `{{ context.run_id }}` continue to run exactly as before. Locks the byte-equivalent default required by the issue's acceptance criteria. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "no-context-ref" name: "No Context Ref" version: "1.0.0" steps: - id: only-step type: shell run: "echo hello" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert state.step_results["only-step"]["output"]["stdout"].strip() == "hello" def test_run_id_uses_speckit_workflow_run_id_env_override(self, project_dir, monkeypatch): """When no run_id argument is provided, SPECKIT_WORKFLOW_RUN_ID overrides the auto-generated run ID.""" from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine monkeypatch.setenv("SPECKIT_WORKFLOW_RUN_ID", "env-run-123") definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "env-run-id" name: "Env Run Id" version: "1.0.0" steps: - id: stamp type: shell run: "echo {{ context.run_id }}" """) state = WorkflowEngine(project_dir).execute(definition) assert state.run_id == "env-run-123" assert state.step_results["stamp"]["output"]["stdout"].strip() == "env-run-123" def test_run_id_arg_takes_precedence_over_env_override(self, project_dir, monkeypatch): """Explicit run_id keeps existing precedence over SPECKIT_WORKFLOW_RUN_ID.""" from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine monkeypatch.setenv("SPECKIT_WORKFLOW_RUN_ID", "env-run-123") definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "explicit-run-id" name: "Explicit Run Id" version: "1.0.0" steps: - id: stamp type: shell run: "echo {{ context.run_id }}" """) state = WorkflowEngine(project_dir).execute(definition, run_id="explicit-456") assert state.run_id == "explicit-456" assert state.step_results["stamp"]["output"]["stdout"].strip() == "explicit-456" # ===== continue_on_error Tests ===== # # Locks the contract documented in workflows/README.md "Error Handling" # section: when a step returns `StepResult(status=StepStatus.FAILED, ...)` and # `continue_on_error: true` is declared, the engine records the step's # `output` (with `exit_code` and `stderr` from the failure) and its # `status` (sibling key on `steps.`, not nested under `output`) # and continues to the next sibling step instead of halting the run. # Gate aborts (`output.aborted`) still halt regardless of the flag. # Unhandled exceptions raised out of `step_impl.execute()` are out of # scope for this flag — they propagate to `WorkflowEngine.execute()` # and abort the run. class TestContinueOnError: """Test the `continue_on_error` step-level field.""" def test_undeclared_failure_halts_run(self, project_dir): """Default behaviour (no `continue_on_error`): a failing step halts the workflow run with `status == StepStatus.FAILED`. Locks the byte-equivalent default — workflows that do not declare the flag must behave exactly as before this feature. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "halt-on-fail" name: "Halt On Fail" version: "1.0.0" steps: - id: fail-step type: shell run: "exit 7" - id: after type: shell run: "echo should-not-run" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.FAILED assert "fail-step" in state.step_results assert state.step_results["fail-step"]["output"]["exit_code"] == 7 # Subsequent step never executes when the flag is absent. assert "after" not in state.step_results def test_declared_and_fired_continues_run(self, project_dir): """`continue_on_error: true` + failing step: the run keeps going, the failed step's result is recorded, and the downstream step runs. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "continue-past-fail" name: "Continue Past Fail" version: "1.0.0" steps: - id: flaky-step type: shell run: "exit 42" continue_on_error: true - id: after type: shell run: "echo did-run" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED # Failed step's exit_code is preserved so downstream branching # can inspect it. assert state.step_results["flaky-step"]["output"]["exit_code"] == 42 assert state.step_results["flaky-step"]["status"] == "failed" # Downstream step ran successfully. assert state.step_results["after"]["output"]["exit_code"] == 0 def test_declared_but_step_succeeded_is_noop(self, project_dir): """`continue_on_error: true` on a step that succeeds is a no-op — the flag only changes behaviour on StepStatus.FAILED status. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "flag-but-success" name: "Flag But Success" version: "1.0.0" steps: - id: ok-step type: shell run: "echo ok" continue_on_error: true - id: after type: shell run: "echo done" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert state.step_results["ok-step"]["status"] == "completed" assert state.step_results["ok-step"]["output"]["exit_code"] == 0 assert state.step_results["after"]["output"]["exit_code"] == 0 def test_if_branch_routes_around_failure(self, project_dir): """End-to-end: `continue_on_error` + `if` cleanly routes around a failure. The recovery branch runs; the success branch does not. Mirrors the canonical usage pattern from the original feature discussion in issue #2591. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "route-around" name: "Route Around Failure" version: "1.0.0" steps: - id: heavy-thing type: shell run: "exit 1" continue_on_error: true - id: check-result type: if condition: "{{ steps.heavy-thing.output.exit_code != 0 }}" then: - id: recovery type: shell run: "echo recovery-ran" else: - id: happy-path type: shell run: "echo happy-path-ran" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert "recovery" in state.step_results assert "happy-path" not in state.step_results def test_gate_abort_still_halts_with_continue_on_error( self, project_dir, monkeypatch ): """`continue_on_error` does NOT override a deliberate gate abort. `output.aborted` always halts the run with `status == ABORTED`. Aborts are explicit operator decisions; continue_on_error is for transient/expected step failures only. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine from specify_cli.workflows.base import RunStatus from specify_cli.workflows.steps.gate import GateStep from specify_cli.workflows.steps import gate as gate_module # Force the gate step into interactive mode and feed a "reject" # choice so the abort path actually runs in the test env # (default behaviour returns StepStatus.PAUSED when stdin is not a TTY). # Swap sys.stdin itself for a stub: setattr on the real # TextIOWrapper's `isatty` method is not assignable under some # runners (e.g. pytest with capture disabled). class _TTYStdin: def isatty(self) -> bool: return True monkeypatch.setattr(gate_module.sys, "stdin", _TTYStdin()) monkeypatch.setattr( GateStep, "_prompt", staticmethod(lambda _msg, _opts: "reject") ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "gate-abort-halts" name: "Gate Abort Halts" version: "1.0.0" steps: - id: gate-step type: gate message: "Approve?" options: [approve, reject] on_reject: abort continue_on_error: true - id: should-not-run type: shell run: "echo nope" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.ABORTED assert "should-not-run" not in state.step_results def test_validation_rejects_non_bool_continue_on_error(self): """`continue_on_error` must be a literal boolean; coerced strings like `"true"` are rejected at validation time so authoring mistakes surface before execution. """ from specify_cli.workflows.engine import ( WorkflowDefinition, validate_workflow, ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "bad-coe" name: "Bad COE" version: "1.0.0" steps: - id: step-one type: shell run: "true" continue_on_error: "true" """) errors = validate_workflow(definition) assert any( "continue_on_error" in e and "boolean" in e for e in errors ), errors def test_validation_accepts_bool_continue_on_error(self): """Boolean values pass validation cleanly.""" from specify_cli.workflows.engine import ( WorkflowDefinition, validate_workflow, ) for value in (True, False): yaml_value = "true" if value else "false" definition = WorkflowDefinition.from_string(f""" schema_version: "1.0" workflow: id: "good-coe" name: "Good COE" version: "1.0.0" steps: - id: step-one type: shell run: "true" continue_on_error: {yaml_value} """) errors = validate_workflow(definition) assert errors == [], errors def test_engine_ignores_truthy_non_bool_continue_on_error(self, project_dir): """Defense-in-depth: even if a caller bypasses `validate_workflow()` and feeds the engine a definition with `continue_on_error: "true"` (a string), the engine must NOT honour the flag — only a literal boolean enables the behaviour. `WorkflowEngine.execute()` does not auto-validate (the `WorkflowEngine.load_workflow` docstring explicitly notes the definition is "not yet validated; call `validate_workflow()` or `engine.validate()` separately"), so the engine guards against truthy non-bool values itself via an identity check rather than truthiness. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine from specify_cli.workflows.base import RunStatus # Bypass `validate_workflow()` — execute() is what would # be called by a caller that skipped validation. definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "string-coe" name: "String COE" version: "1.0.0" steps: - id: fail-step type: shell run: "exit 1" continue_on_error: "true" - id: should-not-run type: shell run: "echo should-not-run" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition) # String "true" is truthy but not a literal boolean, so the # engine must treat the step as a halting failure. assert state.status == RunStatus.FAILED assert "should-not-run" not in state.step_results # ===== State Persistence Tests ===== class TestRunState: """Test RunState persistence and loading.""" def test_save_and_load(self, project_dir): from specify_cli.workflows.engine import RunState from specify_cli.workflows.base import RunStatus state = RunState( run_id="test-run", workflow_id="test-workflow", project_root=project_dir, ) state.status = RunStatus.RUNNING state.inputs = {"name": "login"} state.step_results = { "step-one": { "output": {"file": "spec.md"}, "status": "completed", } } state.save() loaded = RunState.load("test-run", project_dir) assert loaded.run_id == "test-run" assert loaded.workflow_id == "test-workflow" assert loaded.status == RunStatus.RUNNING assert loaded.inputs == {"name": "login"} assert "step-one" in loaded.step_results def test_load_not_found(self, project_dir): from specify_cli.workflows.engine import RunState with pytest.raises(FileNotFoundError): RunState.load("nonexistent", project_dir) @pytest.mark.parametrize( "malicious_run_id", [ # Parent-directory traversal — the classic path-escape vector. "../escape", "..", "../../etc/passwd", # Embedded path separators — both POSIX and Windows. "foo/bar", "foo\\bar", # Leading non-alphanumeric characters that the existing # pattern's anchor blocks (would be mistaken for CLI flags # or hidden files in shell completions / error messages). ".hidden", "-flag", # NUL byte — some filesystems treat the prefix as a valid # path and silently truncate at the NUL. "foo\x00bar", # Empty string — degenerate case, matches no file but the # validator should reject it before any I/O. "", ], ) def test_load_rejects_path_traversal(self, project_dir, malicious_run_id): """``RunState.load`` validates ``run_id`` before touching the filesystem. Without this guard, a value like ``../escape`` passed via ``specify workflow resume`` would interpolate path-traversal segments into the lookup path. ``state_path.exists()`` would probe arbitrary paths the process can read (a file-existence oracle) and ``json.load`` would happily parse attacker-planted JSON from outside ``.specify/workflows/runs/``. The check must fire *before* the path is built — ``__init__``'s identical regex on ``state_data["run_id"]`` fires too late. """ from specify_cli.workflows.engine import RunState # Plant a state.json *outside* the legitimate ``runs/`` directory # at the location ``../escape`` would traverse to, so a missing # guard would surface as a successful load rather than a # ``FileNotFoundError`` (which would be ambiguous with the # not-found case). runs_dir = project_dir / ".specify" / "workflows" / "runs" runs_dir.mkdir(parents=True, exist_ok=True) attacker_dir = project_dir / ".specify" / "workflows" / "escape" attacker_dir.mkdir(exist_ok=True) (attacker_dir / "state.json").write_text( json.dumps( { "run_id": "pwned", "workflow_id": "attacker-owned", "status": "created", } ), encoding="utf-8", ) with pytest.raises(ValueError, match="Invalid run_id"): RunState.load(malicious_run_id, project_dir) @pytest.mark.parametrize( "bad_run_id", [ # One vector per category from ``test_load_rejects_path_traversal`` # — enough to prove both entry points agree without re-running # the full attack matrix here. "../escape", # parent-directory traversal "foo/bar", # embedded path separator ".hidden", # leading non-alphanumeric "", # empty / degenerate ], ) def test_init_and_load_share_validation(self, project_dir, bad_run_id): """``__init__`` *and* ``load`` reject the same malformed IDs. The two entry points must stay in sync — drift would let an ID slip in via one path that the other would reject, producing confusing crashes mid-workflow. The previous version of this test only exercised ``__init__`` and ``_validate_run_id`` (the shared helper), so a regression in ``load`` — e.g. someone deleting the ``cls._validate_run_id(run_id)`` call there — could slip through despite ``__init__`` and the helper staying aligned. We now hit ``load`` directly with the same vector so any drift between the two call sites is caught by this test. """ from specify_cli.workflows.engine import RunState # ``__init__`` rejects up front. with pytest.raises(ValueError, match="Invalid run_id"): RunState(run_id=bad_run_id) # The shared helper rejects the value too (sanity check that the # ``__init__`` rejection came from the validator, not some # unrelated constructor failure). with pytest.raises(ValueError, match="Invalid run_id"): RunState._validate_run_id(bad_run_id) # And ``load`` rejects it *before* touching the filesystem. This # is the assertion the previous version was missing: without it, # a regression in ``load`` (e.g. forgetting to call the # validator before building the path) would not be caught even # though ``__init__`` and the helper still agreed. with pytest.raises(ValueError, match="Invalid run_id"): RunState.load(bad_run_id, project_dir) def test_append_log(self, project_dir): from specify_cli.workflows.engine import RunState state = RunState( run_id="log-test", workflow_id="test", project_root=project_dir, ) state.append_log({"event": "test_event", "data": "hello"}) log_file = state.runs_dir / "log.jsonl" assert log_file.exists() lines = log_file.read_text().strip().split("\n") entry = json.loads(lines[0]) assert entry["event"] == "test_event" assert "timestamp" in entry class TestListRuns: """Test listing workflow runs.""" def test_list_empty(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine engine = WorkflowEngine(project_dir) assert engine.list_runs() == [] def test_list_after_execution(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition yaml_str = """ schema_version: "1.0" workflow: id: "list-test" name: "List Test" version: "1.0.0" steps: - id: step-one type: shell run: "echo test" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) engine.execute(definition) runs = engine.list_runs() assert len(runs) == 1 assert runs[0]["workflow_id"] == "list-test" # ===== Workflow Registry Tests ===== class TestWorkflowRegistry: """Test WorkflowRegistry operations.""" def test_add_and_get(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry registry = WorkflowRegistry(project_dir) registry.add("test-wf", {"name": "Test", "version": "1.0.0"}) entry = registry.get("test-wf") assert entry is not None assert entry["name"] == "Test" assert "installed_at" in entry def test_remove(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry registry = WorkflowRegistry(project_dir) registry.add("test-wf", {"name": "Test"}) assert registry.is_installed("test-wf") registry.remove("test-wf") assert not registry.is_installed("test-wf") def test_list(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry registry = WorkflowRegistry(project_dir) registry.add("wf-a", {"name": "A"}) registry.add("wf-b", {"name": "B"}) installed = registry.list() assert "wf-a" in installed assert "wf-b" in installed def test_is_installed(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry registry = WorkflowRegistry(project_dir) assert not registry.is_installed("missing") registry.add("exists", {"name": "Exists"}) assert registry.is_installed("exists") def test_persistence(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry registry1 = WorkflowRegistry(project_dir) registry1.add("test-wf", {"name": "Test"}) # Load fresh registry2 = WorkflowRegistry(project_dir) assert registry2.is_installed("test-wf") # ===== Workflow Catalog Tests ===== class TestWorkflowCatalog: """Test WorkflowCatalog catalog resolution.""" def test_default_catalogs(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog catalog = WorkflowCatalog(project_dir) entries = catalog.get_active_catalogs() assert len(entries) == 2 assert entries[0].name == "default" assert entries[1].name == "community" def test_env_var_override(self, project_dir, monkeypatch): from specify_cli.workflows.catalog import WorkflowCatalog monkeypatch.setenv("SPECKIT_WORKFLOW_CATALOG_URL", "https://example.com/catalog.json") catalog = WorkflowCatalog(project_dir) entries = catalog.get_active_catalogs() assert len(entries) == 1 assert entries[0].name == "env-override" assert entries[0].url == "https://example.com/catalog.json" def test_project_level_config(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog config_path = project_dir / ".specify" / "workflow-catalogs.yml" config_path.write_text(yaml.dump({ "catalogs": [{ "name": "custom", "url": "https://example.com/wf-catalog.json", "priority": 1, "install_allowed": True, }] })) catalog = WorkflowCatalog(project_dir) entries = catalog.get_active_catalogs() assert len(entries) == 1 assert entries[0].name == "custom" def test_validate_url_http_rejected(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError catalog = WorkflowCatalog(project_dir) with pytest.raises(WorkflowValidationError, match="HTTPS"): catalog._validate_catalog_url("http://evil.com/catalog.json") def test_validate_url_localhost_http_allowed(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog catalog = WorkflowCatalog(project_dir) # Should not raise catalog._validate_catalog_url("http://localhost:8080/catalog.json") def test_add_catalog(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog catalog = WorkflowCatalog(project_dir) catalog.add_catalog("https://example.com/new-catalog.json", "my-catalog") config_path = project_dir / ".specify" / "workflow-catalogs.yml" assert config_path.exists() data = yaml.safe_load(config_path.read_text()) assert len(data["catalogs"]) == 1 assert data["catalogs"][0]["url"] == "https://example.com/new-catalog.json" def test_add_catalog_duplicate_rejected(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError catalog = WorkflowCatalog(project_dir) catalog.add_catalog("https://example.com/catalog.json") with pytest.raises(WorkflowValidationError, match="already configured"): catalog.add_catalog("https://example.com/catalog.json") def test_remove_catalog(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog catalog = WorkflowCatalog(project_dir) catalog.add_catalog("https://example.com/c1.json", "first") catalog.add_catalog("https://example.com/c2.json", "second") removed = catalog.remove_catalog(0) assert removed == "first" config_path = project_dir / ".specify" / "workflow-catalogs.yml" data = yaml.safe_load(config_path.read_text()) assert len(data["catalogs"]) == 1 def test_remove_catalog_invalid_index(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError catalog = WorkflowCatalog(project_dir) catalog.add_catalog("https://example.com/c1.json") with pytest.raises(WorkflowValidationError, match="out of range"): catalog.remove_catalog(5) def test_get_catalog_configs(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog catalog = WorkflowCatalog(project_dir) configs = catalog.get_catalog_configs() assert len(configs) == 2 assert configs[0]["name"] == "default" assert isinstance(configs[0]["install_allowed"], bool) # ===== Integration Test ===== class TestWorkflowIntegration: """End-to-end workflow execution tests.""" def test_full_sequential_workflow(self, project_dir): """Execute a multi-step sequential workflow end to end.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "e2e-test" name: "E2E Test" version: "1.0.0" integration: claude inputs: feature: type: string default: "login" steps: - id: specify type: shell run: "echo speckit.specify {{ inputs.feature }}" - id: check-scope type: if condition: "{{ inputs.feature == 'login' }}" then: - id: echo-full type: shell run: "echo full scope" else: - id: echo-partial type: shell run: "echo partial scope" - id: plan type: shell run: "echo speckit.plan" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert "specify" in state.step_results assert "check-scope" in state.step_results assert "echo-full" in state.step_results assert "echo-partial" not in state.step_results assert "plan" in state.step_results def test_switch_workflow(self, project_dir): """Test switch step type in a workflow.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "switch-test" name: "Switch Test" version: "1.0.0" inputs: action: type: string default: "plan" steps: - id: route type: switch expression: "{{ inputs.action }}" cases: specify: - id: do-specify type: shell run: "echo specify" plan: - id: do-plan type: shell run: "echo plan" default: - id: do-default type: shell run: "echo default" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert "do-plan" in state.step_results assert "do-specify" not in state.step_results class TestWorkflowJsonOutput: """Test the --json machine-readable output for run/resume/status.""" _WF = """ schema_version: "1.0" workflow: id: "json-wf" name: "JSON WF" version: "1.0.0" steps: - id: ask type: gate message: "Review" options: [approve, reject] - id: after type: shell run: "echo done" """ _WF_DONE = """ schema_version: "1.0" workflow: id: "json-done" name: "JSON Done" version: "1.0.0" steps: - id: only type: shell run: "echo done" """ def _write_wf(self, project_dir, text, name): path = project_dir / f"{name}.yml" path.write_text(text, encoding="utf-8") return path def _invoke(self, project_dir, args): from typer.testing import CliRunner from unittest.mock import patch from specify_cli import app runner = CliRunner() with patch.object(Path, "cwd", return_value=project_dir): return runner.invoke(app, args, catch_exceptions=False) def test_run_json_completed(self, project_dir): wf = self._write_wf(project_dir, self._WF_DONE, "done") result = self._invoke(project_dir, ["workflow", "run", str(wf), "--json"]) assert result.exit_code == 0 payload = json.loads(result.stdout) assert payload["workflow_id"] == "json-done" assert payload["status"] == "completed" assert "run_id" in payload def test_run_json_paused(self, project_dir): wf = self._write_wf(project_dir, self._WF, "gated") result = self._invoke(project_dir, ["workflow", "run", str(wf), "--json"]) assert result.exit_code == 0 payload = json.loads(result.stdout) assert payload["status"] == "paused" assert payload["current_step_id"] == "ask" assert payload["current_step_index"] == 0 def test_run_json_output_has_no_markup_or_ansi(self, project_dir): wf = self._write_wf(project_dir, self._WF_DONE, "clean") out = self._invoke( project_dir, ["workflow", "run", str(wf), "--json"] ).stdout # Machine output must be exactly the JSON object: no Rich markup # tags and no ANSI escape sequences leaking in. assert "\x1b[" not in out assert "[/" not in out assert out.strip() == json.dumps(json.loads(out), indent=2) def test_run_default_output_is_human_not_json(self, project_dir): wf = self._write_wf(project_dir, self._WF_DONE, "done2") result = self._invoke(project_dir, ["workflow", "run", str(wf)]) assert result.exit_code == 0 assert "Running workflow" in result.stdout with pytest.raises(json.JSONDecodeError): json.loads(result.stdout) def test_status_json_single_and_list(self, project_dir): wf = self._write_wf(project_dir, self._WF, "gated2") run = json.loads( self._invoke(project_dir, ["workflow", "run", str(wf), "--json"]).stdout ) rid = run["run_id"] single = json.loads( self._invoke(project_dir, ["workflow", "status", rid, "--json"]).stdout ) assert single["run_id"] == rid assert single["status"] == "paused" assert single["steps"]["ask"] == "paused" # status --json carries the same step-position fields as run/resume # so automation never has to branch on which command produced it. assert single["current_step_id"] == run["current_step_id"] assert single["current_step_index"] == run["current_step_index"] listing = json.loads( self._invoke(project_dir, ["workflow", "status", "--json"]).stdout ) assert any(r["run_id"] == rid for r in listing["runs"]) def test_resume_json(self, project_dir): wf = self._write_wf(project_dir, self._WF, "gated3") rid = json.loads( self._invoke(project_dir, ["workflow", "run", str(wf), "--json"]).stdout )["run_id"] # Non-interactive resume re-runs the gate, which pauses again. resumed = json.loads( self._invoke(project_dir, ["workflow", "resume", rid, "--json"]).stdout ) assert resumed["run_id"] == rid assert resumed["status"] == "paused" def test_json_redirect_keeps_stdout_clean(self, capfd): # While a workflow runs under --json, steps can still write to stdout: # the gate step prints its prompt and the prompt step runs a # subprocess that inherits the stdout fd. Both must be redirected to # stderr so the JSON object on stdout stays parseable. capfd captures # at the file-descriptor level, so it sees the subprocess output too. import subprocess import sys as _sys from specify_cli import _stdout_to_stderr_when print("STDOUT_BEFORE") with _stdout_to_stderr_when(True): print("PY_LEAK") # Python-level write (gate-style) subprocess.run( # inherited-fd write (prompt-style) [_sys.executable, "-c", "print('SUBPROC_LEAK')"], check=True, ) print("STDOUT_AFTER") out, err = capfd.readouterr() # stdout keeps only what was written outside the guarded block. assert "STDOUT_BEFORE" in out and "STDOUT_AFTER" in out assert "PY_LEAK" not in out and "SUBPROC_LEAK" not in out # The step output is preserved on stderr, not discarded. assert "PY_LEAK" in err and "SUBPROC_LEAK" in err def test_json_redirect_inactive_is_noop(self, capfd): from specify_cli import _stdout_to_stderr_when with _stdout_to_stderr_when(False): print("VISIBLE_ON_STDOUT") out, _ = capfd.readouterr() assert "VISIBLE_ON_STDOUT" in out class TestResumeWithInputs: """Test that `workflow resume` can accept updated workflow inputs.""" _WF_CMD = """ schema_version: "1.0" workflow: id: "resume-cmd-wf" name: "Resume Cmd WF" version: "1.0.0" inputs: cmd: type: string default: "exit 1" steps: - id: s type: shell run: "{{ inputs.cmd }}" """ _WF_NUM = """ schema_version: "1.0" workflow: id: "resume-num-wf" name: "Resume Num WF" version: "1.0.0" inputs: count: type: number default: 1 steps: - id: gate type: gate message: "Review" options: [approve, reject] """ def _engine(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine return WorkflowEngine(project_dir) def test_resume_with_input_reruns_step_with_new_value(self, project_dir): from specify_cli.workflows.engine import WorkflowDefinition from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(self._WF_CMD) engine = self._engine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.FAILED # "exit 1" fails resumed = engine.resume(state.run_id, {"cmd": "exit 0"}) assert resumed.status == RunStatus.COMPLETED assert resumed.inputs["cmd"] == "exit 0" def test_resume_without_input_preserves_inputs(self, project_dir): from specify_cli.workflows.engine import WorkflowDefinition from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(self._WF_CMD) engine = self._engine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.FAILED resumed = engine.resume(state.run_id) assert resumed.status == RunStatus.FAILED # still "exit 1" assert resumed.inputs["cmd"] == "exit 1" def test_resume_merges_and_coerces_typed_input(self, project_dir): import json as _json from specify_cli.workflows.engine import WorkflowDefinition from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(self._WF_NUM) engine = self._engine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.PAUSED resumed = engine.resume(state.run_id, {"count": "5"}) assert resumed.inputs["count"] == 5 # coerced string -> number inputs_file = ( project_dir / ".specify" / "workflows" / "runs" / state.run_id / "inputs.json" ) assert _json.loads(inputs_file.read_text())["inputs"]["count"] == 5 def test_resume_invalid_typed_input_raises(self, project_dir): from specify_cli.workflows.engine import WorkflowDefinition definition = WorkflowDefinition.from_string(self._WF_NUM) engine = self._engine(project_dir) state = engine.execute(definition) with pytest.raises(ValueError): engine.resume(state.run_id, {"count": "not-a-number"}) def test_cli_resume_input_invalid_format_errors(self, project_dir): from typer.testing import CliRunner from unittest.mock import patch from specify_cli import app from specify_cli.workflows.engine import WorkflowDefinition definition = WorkflowDefinition.from_string(self._WF_NUM) state = self._engine(project_dir).execute(definition) runner = CliRunner() with patch.object(Path, "cwd", return_value=project_dir): result = runner.invoke( app, ["workflow", "resume", state.run_id, "--input", "bogus"] ) assert result.exit_code == 1 assert "Invalid input format" in result.stdout