mirror of
https://github.com/github/spec-kit.git
synced 2026-07-04 04:45:43 +08:00
* fix(copilot): use --yolo to grant all permissions in non-interactive mode The Copilot CLI's --allow-all-tools flag only covers tool execution permissions but does not grant path or URL access. When the Copilot agent autonomously runs shell commands (e.g. npm run build) during workflow execution, the CLI blocks path access and cannot prompt for approval in non-interactive mode, producing: Permission denied and could not request permission from user Replace --allow-all-tools with --yolo (equivalent to --allow-all-tools --allow-all-paths --allow-all-urls) to grant all three permission types. Rename the opt-out env var from SPECKIT_ALLOW_ALL_TOOLS to SPECKIT_COPILOT_ALLOW_ALL to match the formal --allow-all alias and scope it to the Copilot integration. Fixes #2294 * review: deprecate SPECKIT_ALLOW_ALL_TOOLS, rename to SPECKIT_COPILOT_ALLOW_ALL_TOOLS Address Copilot review feedback: - Honour the old SPECKIT_ALLOW_ALL_TOOLS env var as a fallback with a DeprecationWarning so existing opt-outs are not silently ignored. - Rename the new canonical env var to SPECKIT_COPILOT_ALLOW_ALL_TOOLS. - New var takes precedence when both are set. - Use monkeypatch in tests to avoid flakiness from ambient env vars. - Add tests for deprecation warning, precedence, and opt-out paths. * review: use UserWarning instead of DeprecationWarning DeprecationWarning is suppressed by default in Python, so users relying on the old SPECKIT_ALLOW_ALL_TOOLS env var would never see the deprecation notice during normal CLI runs. Switch to UserWarning which is always shown. Update test to also assert the warning category.
1846 lines
61 KiB
Python
1846 lines
61 KiB
Python
"""Tests for the workflow engine subsystem.
|
|
|
|
Covers:
|
|
- Step registry & auto-discovery
|
|
- Base classes (StepBase, StepContext, StepResult)
|
|
- Expression engine
|
|
- All 10 built-in step types
|
|
- Workflow definition loading & validation
|
|
- Workflow engine execution & state persistence
|
|
- Workflow catalog & registry
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import shutil
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
import yaml
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def temp_dir():
|
|
"""Create a temporary directory for tests."""
|
|
tmpdir = tempfile.mkdtemp()
|
|
yield Path(tmpdir)
|
|
shutil.rmtree(tmpdir)
|
|
|
|
|
|
@pytest.fixture
|
|
def project_dir(temp_dir):
|
|
"""Create a mock spec-kit project with .specify/ directory."""
|
|
specify_dir = temp_dir / ".specify"
|
|
specify_dir.mkdir()
|
|
(specify_dir / "workflows").mkdir()
|
|
return temp_dir
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_workflow_yaml():
|
|
"""Return a valid minimal workflow YAML string."""
|
|
return """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "test-workflow"
|
|
name: "Test Workflow"
|
|
version: "1.0.0"
|
|
description: "A test workflow"
|
|
|
|
inputs:
|
|
spec:
|
|
type: string
|
|
required: true
|
|
scope:
|
|
type: string
|
|
default: "full"
|
|
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
input:
|
|
args: "{{ inputs.spec }}"
|
|
|
|
- id: step-two
|
|
command: speckit.plan
|
|
input:
|
|
args: "{{ steps.step-one.output.command }}"
|
|
"""
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_workflow_file(project_dir, sample_workflow_yaml):
|
|
"""Write a sample workflow YAML to a file and return its path."""
|
|
wf_dir = project_dir / ".specify" / "workflows" / "test-workflow"
|
|
wf_dir.mkdir(parents=True, exist_ok=True)
|
|
wf_path = wf_dir / "workflow.yml"
|
|
wf_path.write_text(sample_workflow_yaml, encoding="utf-8")
|
|
return wf_path
|
|
|
|
|
|
# ===== Step Registry Tests =====
|
|
|
|
class TestStepRegistry:
|
|
"""Test STEP_REGISTRY and auto-discovery."""
|
|
|
|
def test_registry_populated(self):
|
|
from specify_cli.workflows import STEP_REGISTRY
|
|
|
|
assert len(STEP_REGISTRY) >= 10
|
|
|
|
def test_all_step_types_registered(self):
|
|
from specify_cli.workflows import STEP_REGISTRY
|
|
|
|
expected = {
|
|
"command", "shell", "prompt", "gate", "if", "switch",
|
|
"while", "do-while", "fan-out", "fan-in",
|
|
}
|
|
assert expected.issubset(set(STEP_REGISTRY.keys()))
|
|
|
|
def test_get_step_type(self):
|
|
from specify_cli.workflows import get_step_type
|
|
|
|
step = get_step_type("command")
|
|
assert step is not None
|
|
assert step.type_key == "command"
|
|
|
|
def test_get_step_type_missing(self):
|
|
from specify_cli.workflows import get_step_type
|
|
|
|
assert get_step_type("nonexistent") is None
|
|
|
|
def test_register_step_duplicate_raises(self):
|
|
from specify_cli.workflows import _register_step
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
|
|
with pytest.raises(KeyError, match="already registered"):
|
|
_register_step(CommandStep())
|
|
|
|
def test_register_step_empty_key_raises(self):
|
|
from specify_cli.workflows import _register_step
|
|
from specify_cli.workflows.base import StepBase, StepResult
|
|
|
|
class EmptyStep(StepBase):
|
|
type_key = ""
|
|
def execute(self, config, context):
|
|
return StepResult()
|
|
|
|
with pytest.raises(ValueError, match="empty type_key"):
|
|
_register_step(EmptyStep())
|
|
|
|
|
|
# ===== Base Classes Tests =====
|
|
|
|
class TestBaseClasses:
|
|
"""Test StepBase, StepContext, StepResult."""
|
|
|
|
def test_step_context_defaults(self):
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert ctx.inputs == {}
|
|
assert ctx.steps == {}
|
|
assert ctx.item is None
|
|
assert ctx.fan_in == {}
|
|
assert ctx.default_integration is None
|
|
|
|
def test_step_context_with_data(self):
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(
|
|
inputs={"name": "test"},
|
|
default_integration="claude",
|
|
default_model="sonnet-4",
|
|
)
|
|
assert ctx.inputs == {"name": "test"}
|
|
assert ctx.default_integration == "claude"
|
|
assert ctx.default_model == "sonnet-4"
|
|
|
|
def test_step_result_defaults(self):
|
|
from specify_cli.workflows.base import StepResult, StepStatus
|
|
|
|
result = StepResult()
|
|
assert result.status == StepStatus.COMPLETED
|
|
assert result.output == {}
|
|
assert result.next_steps == []
|
|
assert result.error is None
|
|
|
|
def test_step_status_values(self):
|
|
from specify_cli.workflows.base import StepStatus
|
|
|
|
assert StepStatus.PENDING == "pending"
|
|
assert StepStatus.RUNNING == "running"
|
|
assert StepStatus.COMPLETED == "completed"
|
|
assert StepStatus.FAILED == "failed"
|
|
assert StepStatus.SKIPPED == "skipped"
|
|
assert StepStatus.PAUSED == "paused"
|
|
|
|
def test_run_status_values(self):
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
assert RunStatus.CREATED == "created"
|
|
assert RunStatus.RUNNING == "running"
|
|
assert RunStatus.PAUSED == "paused"
|
|
assert RunStatus.COMPLETED == "completed"
|
|
assert RunStatus.FAILED == "failed"
|
|
assert RunStatus.ABORTED == "aborted"
|
|
|
|
|
|
# ===== Expression Engine Tests =====
|
|
|
|
class TestExpressions:
|
|
"""Test sandboxed expression evaluator."""
|
|
|
|
def test_simple_variable(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"name": "login"})
|
|
assert evaluate_expression("{{ inputs.name }}", ctx) == "login"
|
|
|
|
def test_step_output_reference(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(
|
|
steps={"specify": {"output": {"file": "spec.md"}}}
|
|
)
|
|
assert evaluate_expression("{{ steps.specify.output.file }}", ctx) == "spec.md"
|
|
|
|
def test_string_interpolation(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"name": "login"})
|
|
result = evaluate_expression("Feature: {{ inputs.name }} done", ctx)
|
|
assert result == "Feature: login done"
|
|
|
|
def test_comparison_equals(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"scope": "full"})
|
|
assert evaluate_expression("{{ inputs.scope == 'full' }}", ctx) is True
|
|
assert evaluate_expression("{{ inputs.scope == 'partial' }}", ctx) is False
|
|
|
|
def test_comparison_not_equals(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(
|
|
steps={"run-tests": {"output": {"exit_code": 1}}}
|
|
)
|
|
result = evaluate_expression("{{ steps.run-tests.output.exit_code != 0 }}", ctx)
|
|
assert result is True
|
|
|
|
def test_numeric_comparison(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(
|
|
steps={"plan": {"output": {"task_count": 7}}}
|
|
)
|
|
assert evaluate_expression("{{ steps.plan.output.task_count > 5 }}", ctx) is True
|
|
assert evaluate_expression("{{ steps.plan.output.task_count < 5 }}", ctx) is False
|
|
|
|
def test_boolean_and(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"a": True, "b": True})
|
|
assert evaluate_expression("{{ inputs.a and inputs.b }}", ctx) is True
|
|
|
|
def test_boolean_or(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"a": False, "b": True})
|
|
assert evaluate_expression("{{ inputs.a or inputs.b }}", ctx) is True
|
|
|
|
def test_filter_default(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert evaluate_expression("{{ inputs.missing | default('fallback') }}", ctx) == "fallback"
|
|
|
|
def test_filter_join(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"tags": ["a", "b", "c"]})
|
|
assert evaluate_expression("{{ inputs.tags | join(', ') }}", ctx) == "a, b, c"
|
|
|
|
def test_filter_contains(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"text": "hello world"})
|
|
assert evaluate_expression("{{ inputs.text | contains('world') }}", ctx) is True
|
|
|
|
def test_condition_evaluation(self):
|
|
from specify_cli.workflows.expressions import evaluate_condition
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"ready": True})
|
|
assert evaluate_condition("{{ inputs.ready }}", ctx) is True
|
|
assert evaluate_condition("{{ inputs.missing }}", ctx) is False
|
|
|
|
def test_non_string_passthrough(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert evaluate_expression(42, ctx) == 42
|
|
assert evaluate_expression(None, ctx) is None
|
|
|
|
def test_string_literal(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert evaluate_expression("{{ 'hello' }}", ctx) == "hello"
|
|
|
|
def test_numeric_literal(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert evaluate_expression("{{ 42 }}", ctx) == 42
|
|
|
|
def test_boolean_literal(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert evaluate_expression("{{ true }}", ctx) is True
|
|
assert evaluate_expression("{{ false }}", ctx) is False
|
|
|
|
def test_list_indexing(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(
|
|
steps={"tasks": {"output": {"task_list": [{"file": "a.md"}, {"file": "b.md"}]}}}
|
|
)
|
|
result = evaluate_expression("{{ steps.tasks.output.task_list[0].file }}", ctx)
|
|
assert result == "a.md"
|
|
|
|
|
|
# ===== Integration Dispatch Tests =====
|
|
|
|
class TestBuildExecArgs:
|
|
"""Test build_exec_args for CLI-based integrations."""
|
|
|
|
def test_claude_exec_args(self):
|
|
from specify_cli.integrations.claude import ClaudeIntegration
|
|
impl = ClaudeIntegration()
|
|
args = impl.build_exec_args("do stuff", model="sonnet-4")
|
|
assert args[0] == "claude"
|
|
assert args[1] == "-p"
|
|
assert args[2] == "do stuff"
|
|
assert "--model" in args
|
|
assert "sonnet-4" in args
|
|
assert "--output-format" in args
|
|
|
|
def test_gemini_exec_args(self):
|
|
from specify_cli.integrations.gemini import GeminiIntegration
|
|
impl = GeminiIntegration()
|
|
args = impl.build_exec_args("do stuff", model="gemini-2.5-pro")
|
|
assert args[0] == "gemini"
|
|
assert args[1] == "-p"
|
|
assert "-m" in args
|
|
assert "gemini-2.5-pro" in args
|
|
|
|
def test_codex_exec_args(self):
|
|
from specify_cli.integrations.codex import CodexIntegration
|
|
impl = CodexIntegration()
|
|
args = impl.build_exec_args("do stuff")
|
|
assert args[0] == "codex"
|
|
assert args[1] == "exec"
|
|
assert args[2] == "do stuff"
|
|
assert "--json" in args
|
|
|
|
def test_copilot_exec_args(self, monkeypatch):
|
|
monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False)
|
|
monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False)
|
|
from specify_cli.integrations.copilot import CopilotIntegration
|
|
impl = CopilotIntegration()
|
|
args = impl.build_exec_args("do stuff", model="claude-sonnet-4-20250514")
|
|
assert args[0] == "copilot"
|
|
assert "-p" in args
|
|
assert "--yolo" in args
|
|
assert "--model" in args
|
|
|
|
def test_copilot_new_env_var_disables_yolo(self, monkeypatch):
|
|
monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "0")
|
|
monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False)
|
|
from specify_cli.integrations.copilot import CopilotIntegration
|
|
impl = CopilotIntegration()
|
|
args = impl.build_exec_args("do stuff")
|
|
assert "--yolo" not in args
|
|
|
|
def test_copilot_deprecated_env_var_still_honoured(self, monkeypatch):
|
|
monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False)
|
|
monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0")
|
|
import warnings
|
|
from specify_cli.integrations.copilot import CopilotIntegration
|
|
impl = CopilotIntegration()
|
|
with warnings.catch_warnings(record=True) as w:
|
|
warnings.simplefilter("always")
|
|
args = impl.build_exec_args("do stuff")
|
|
assert "--yolo" not in args
|
|
assert any(
|
|
"SPECKIT_ALLOW_ALL_TOOLS is deprecated" in str(x.message)
|
|
and issubclass(x.category, UserWarning)
|
|
for x in w
|
|
)
|
|
|
|
def test_copilot_new_env_var_takes_precedence(self, monkeypatch):
|
|
monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "1")
|
|
monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0")
|
|
from specify_cli.integrations.copilot import CopilotIntegration
|
|
impl = CopilotIntegration()
|
|
args = impl.build_exec_args("do stuff")
|
|
assert "--yolo" in args
|
|
|
|
def test_ide_only_returns_none(self):
|
|
from specify_cli.integrations.windsurf import WindsurfIntegration
|
|
impl = WindsurfIntegration()
|
|
assert impl.build_exec_args("test") is None
|
|
|
|
def test_no_model_omits_flag(self):
|
|
from specify_cli.integrations.claude import ClaudeIntegration
|
|
impl = ClaudeIntegration()
|
|
args = impl.build_exec_args("do stuff", model=None)
|
|
assert "--model" not in args
|
|
|
|
def test_no_json_omits_flag(self):
|
|
from specify_cli.integrations.claude import ClaudeIntegration
|
|
impl = ClaudeIntegration()
|
|
args = impl.build_exec_args("do stuff", output_json=False)
|
|
assert "--output-format" not in args
|
|
|
|
|
|
# ===== Step Type Tests =====
|
|
|
|
class TestCommandStep:
|
|
"""Test the command step type."""
|
|
|
|
def test_execute_basic(self):
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(
|
|
inputs={"name": "login"},
|
|
default_integration="claude",
|
|
)
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.specify",
|
|
"input": {"args": "{{ inputs.name }}"},
|
|
}
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.FAILED
|
|
assert result.output["command"] == "speckit.specify"
|
|
assert result.output["integration"] == "claude"
|
|
assert result.output["input"]["args"] == "login"
|
|
|
|
def test_validate_missing_command(self):
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
|
|
step = CommandStep()
|
|
errors = step.validate({"id": "test"})
|
|
assert any("missing 'command'" in e for e in errors)
|
|
|
|
def test_step_override_integration(self):
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(default_integration="claude")
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.plan",
|
|
"integration": "gemini",
|
|
"input": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["integration"] == "gemini"
|
|
|
|
def test_step_override_model(self):
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(default_model="sonnet-4")
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.implement",
|
|
"model": "opus-4",
|
|
"input": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["model"] == "opus-4"
|
|
|
|
def test_options_merge(self):
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(default_options={"max-tokens": 8000})
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.plan",
|
|
"options": {"thinking-budget": 32768},
|
|
"input": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["options"]["max-tokens"] == 8000
|
|
assert result.output["options"]["thinking-budget"] == 32768
|
|
|
|
def test_dispatch_not_attempted_without_cli(self):
|
|
"""When the CLI tool is not installed, step should fail."""
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(
|
|
inputs={"name": "login"},
|
|
default_integration="claude",
|
|
project_root="/tmp",
|
|
)
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.specify",
|
|
"input": {"args": "{{ inputs.name }}"},
|
|
}
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.FAILED
|
|
assert result.output["dispatched"] is False
|
|
assert result.error is not None
|
|
|
|
def test_dispatch_with_mock_cli(self, tmp_path, monkeypatch):
|
|
"""When the CLI is installed, dispatch invokes the command by name."""
|
|
from unittest.mock import patch, MagicMock
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(
|
|
inputs={"name": "login"},
|
|
default_integration="claude",
|
|
project_root=str(tmp_path),
|
|
)
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.specify",
|
|
"input": {"args": "{{ inputs.name }}"},
|
|
}
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.returncode = 0
|
|
mock_result.stdout = '{"result": "done"}'
|
|
mock_result.stderr = ""
|
|
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \
|
|
patch("subprocess.run", return_value=mock_result) as mock_run:
|
|
result = step.execute(config, ctx)
|
|
|
|
assert result.status == StepStatus.COMPLETED
|
|
assert result.output["dispatched"] is True
|
|
assert result.output["exit_code"] == 0
|
|
# Verify the CLI was called with -p and the skill invocation
|
|
call_args = mock_run.call_args
|
|
assert call_args[0][0][0] == "claude"
|
|
assert call_args[0][0][1] == "-p"
|
|
# Claude is a SkillsIntegration so uses /speckit-specify
|
|
assert "/speckit-specify login" in call_args[0][0][2]
|
|
|
|
def test_dispatch_failure_returns_failed_status(self, tmp_path):
|
|
"""When the CLI exits non-zero, the step should fail."""
|
|
from unittest.mock import patch, MagicMock
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(
|
|
inputs={},
|
|
default_integration="claude",
|
|
project_root=str(tmp_path),
|
|
)
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.specify",
|
|
"input": {"args": "test"},
|
|
}
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.returncode = 1
|
|
mock_result.stdout = ""
|
|
mock_result.stderr = "API error"
|
|
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \
|
|
patch("subprocess.run", return_value=mock_result):
|
|
result = step.execute(config, ctx)
|
|
|
|
assert result.status == StepStatus.FAILED
|
|
assert result.output["dispatched"] is True
|
|
assert result.output["exit_code"] == 1
|
|
|
|
|
|
class TestPromptStep:
|
|
"""Test the prompt step type."""
|
|
|
|
def test_execute_basic(self):
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = PromptStep()
|
|
ctx = StepContext(
|
|
inputs={"file": "auth.py"},
|
|
default_integration="claude",
|
|
)
|
|
config = {
|
|
"id": "review",
|
|
"type": "prompt",
|
|
"prompt": "Review {{ inputs.file }} for security issues",
|
|
}
|
|
with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None):
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.FAILED
|
|
assert result.output["prompt"] == "Review auth.py for security issues"
|
|
assert result.output["integration"] == "claude"
|
|
assert result.output["dispatched"] is False
|
|
|
|
def test_execute_with_step_integration(self):
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = PromptStep()
|
|
ctx = StepContext(default_integration="claude")
|
|
config = {
|
|
"id": "review",
|
|
"type": "prompt",
|
|
"prompt": "Summarize the codebase",
|
|
"integration": "gemini",
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["integration"] == "gemini"
|
|
|
|
def test_execute_with_model(self):
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = PromptStep()
|
|
ctx = StepContext(default_integration="claude", default_model="sonnet-4")
|
|
config = {
|
|
"id": "review",
|
|
"type": "prompt",
|
|
"prompt": "hello",
|
|
"model": "opus-4",
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["model"] == "opus-4"
|
|
|
|
def test_dispatch_with_mock_cli(self, tmp_path):
|
|
from unittest.mock import patch, MagicMock
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = PromptStep()
|
|
ctx = StepContext(
|
|
default_integration="claude",
|
|
project_root=str(tmp_path),
|
|
)
|
|
config = {
|
|
"id": "ask",
|
|
"type": "prompt",
|
|
"prompt": "Explain this code",
|
|
}
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.returncode = 0
|
|
mock_result.stdout = "Here is the explanation"
|
|
mock_result.stderr = ""
|
|
|
|
with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value="/usr/local/bin/claude"), \
|
|
patch("subprocess.run", return_value=mock_result):
|
|
result = step.execute(config, ctx)
|
|
|
|
assert result.status == StepStatus.COMPLETED
|
|
assert result.output["dispatched"] is True
|
|
assert result.output["exit_code"] == 0
|
|
|
|
def test_validate_missing_prompt(self):
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
|
|
step = PromptStep()
|
|
errors = step.validate({"id": "test"})
|
|
assert any("missing 'prompt'" in e for e in errors)
|
|
|
|
def test_validate_valid(self):
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
|
|
step = PromptStep()
|
|
errors = step.validate({"id": "test", "prompt": "do something"})
|
|
assert errors == []
|
|
|
|
|
|
class TestShellStep:
|
|
"""Test the shell step type."""
|
|
|
|
def test_execute_echo(self):
|
|
from specify_cli.workflows.steps.shell import ShellStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = ShellStep()
|
|
ctx = StepContext()
|
|
config = {"id": "test", "run": "echo hello"}
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.COMPLETED
|
|
assert result.output["exit_code"] == 0
|
|
assert "hello" in result.output["stdout"]
|
|
|
|
def test_execute_failure(self):
|
|
from specify_cli.workflows.steps.shell import ShellStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = ShellStep()
|
|
ctx = StepContext()
|
|
config = {"id": "test", "run": "exit 1"}
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.FAILED
|
|
assert result.output["exit_code"] == 1
|
|
assert result.error is not None
|
|
|
|
def test_validate_missing_run(self):
|
|
from specify_cli.workflows.steps.shell import ShellStep
|
|
|
|
step = ShellStep()
|
|
errors = step.validate({"id": "test"})
|
|
assert any("missing 'run'" in e for e in errors)
|
|
|
|
|
|
class TestGateStep:
|
|
"""Test the gate step type."""
|
|
|
|
def test_execute_returns_paused(self):
|
|
from specify_cli.workflows.steps.gate import GateStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = GateStep()
|
|
ctx = StepContext()
|
|
config = {
|
|
"id": "review",
|
|
"message": "Review the spec.",
|
|
"options": ["approve", "reject"],
|
|
"on_reject": "abort",
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.PAUSED
|
|
assert result.output["message"] == "Review the spec."
|
|
assert result.output["options"] == ["approve", "reject"]
|
|
|
|
def test_validate_missing_message(self):
|
|
from specify_cli.workflows.steps.gate import GateStep
|
|
|
|
step = GateStep()
|
|
errors = step.validate({"id": "test", "options": ["approve"]})
|
|
assert any("missing 'message'" in e for e in errors)
|
|
|
|
def test_validate_invalid_on_reject(self):
|
|
from specify_cli.workflows.steps.gate import GateStep
|
|
|
|
step = GateStep()
|
|
errors = step.validate({
|
|
"id": "test",
|
|
"message": "Review",
|
|
"on_reject": "invalid",
|
|
})
|
|
assert any("on_reject" in e for e in errors)
|
|
|
|
|
|
class TestIfThenStep:
|
|
"""Test the if/then/else step type."""
|
|
|
|
def test_execute_then_branch(self):
|
|
from specify_cli.workflows.steps.if_then import IfThenStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = IfThenStep()
|
|
ctx = StepContext(inputs={"scope": "full"})
|
|
config = {
|
|
"id": "check",
|
|
"condition": "{{ inputs.scope == 'full' }}",
|
|
"then": [{"id": "a", "command": "speckit.tasks"}],
|
|
"else": [{"id": "b", "command": "speckit.plan"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["condition_result"] is True
|
|
assert len(result.next_steps) == 1
|
|
assert result.next_steps[0]["id"] == "a"
|
|
|
|
def test_execute_else_branch(self):
|
|
from specify_cli.workflows.steps.if_then import IfThenStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = IfThenStep()
|
|
ctx = StepContext(inputs={"scope": "backend"})
|
|
config = {
|
|
"id": "check",
|
|
"condition": "{{ inputs.scope == 'full' }}",
|
|
"then": [{"id": "a", "command": "speckit.tasks"}],
|
|
"else": [{"id": "b", "command": "speckit.plan"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["condition_result"] is False
|
|
assert result.next_steps[0]["id"] == "b"
|
|
|
|
def test_validate_missing_condition(self):
|
|
from specify_cli.workflows.steps.if_then import IfThenStep
|
|
|
|
step = IfThenStep()
|
|
errors = step.validate({"id": "test", "then": []})
|
|
assert any("missing 'condition'" in e for e in errors)
|
|
|
|
|
|
class TestSwitchStep:
|
|
"""Test the switch step type."""
|
|
|
|
def test_execute_matches_case(self):
|
|
from specify_cli.workflows.steps.switch import SwitchStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = SwitchStep()
|
|
ctx = StepContext(
|
|
steps={"review": {"output": {"choice": "approve"}}}
|
|
)
|
|
config = {
|
|
"id": "route",
|
|
"expression": "{{ steps.review.output.choice }}",
|
|
"cases": {
|
|
"approve": [{"id": "plan", "command": "speckit.plan"}],
|
|
"reject": [{"id": "log", "type": "shell", "run": "echo rejected"}],
|
|
},
|
|
"default": [{"id": "abort", "type": "gate", "message": "Unknown"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["matched_case"] == "approve"
|
|
assert result.next_steps[0]["id"] == "plan"
|
|
|
|
def test_execute_falls_to_default(self):
|
|
from specify_cli.workflows.steps.switch import SwitchStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = SwitchStep()
|
|
ctx = StepContext(
|
|
steps={"review": {"output": {"choice": "unknown"}}}
|
|
)
|
|
config = {
|
|
"id": "route",
|
|
"expression": "{{ steps.review.output.choice }}",
|
|
"cases": {
|
|
"approve": [{"id": "plan", "command": "speckit.plan"}],
|
|
},
|
|
"default": [{"id": "fallback", "type": "gate", "message": "Fallback"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["matched_case"] == "__default__"
|
|
assert result.next_steps[0]["id"] == "fallback"
|
|
|
|
def test_execute_no_default_no_match(self):
|
|
from specify_cli.workflows.steps.switch import SwitchStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = SwitchStep()
|
|
ctx = StepContext(
|
|
steps={"review": {"output": {"choice": "other"}}}
|
|
)
|
|
config = {
|
|
"id": "route",
|
|
"expression": "{{ steps.review.output.choice }}",
|
|
"cases": {
|
|
"approve": [{"id": "plan", "command": "speckit.plan"}],
|
|
},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["matched_case"] == "__default__"
|
|
assert result.next_steps == []
|
|
|
|
def test_validate_missing_expression(self):
|
|
from specify_cli.workflows.steps.switch import SwitchStep
|
|
|
|
step = SwitchStep()
|
|
errors = step.validate({"id": "test", "cases": {}})
|
|
assert any("missing 'expression'" in e for e in errors)
|
|
|
|
def test_validate_invalid_cases_and_default(self):
|
|
from specify_cli.workflows.steps.switch import SwitchStep
|
|
|
|
step = SwitchStep()
|
|
errors = step.validate({
|
|
"id": "test",
|
|
"expression": "{{ x }}",
|
|
"cases": {"a": "not-a-list"},
|
|
"default": "also-bad",
|
|
})
|
|
assert any("case 'a' must be a list" in e for e in errors)
|
|
assert any("'default' must be a list" in e for e in errors)
|
|
|
|
|
|
class TestWhileStep:
|
|
"""Test the while loop step type."""
|
|
|
|
def test_execute_condition_true(self):
|
|
from specify_cli.workflows.steps.while_loop import WhileStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = WhileStep()
|
|
ctx = StepContext(
|
|
steps={"run-tests": {"output": {"exit_code": 1}}}
|
|
)
|
|
config = {
|
|
"id": "retry",
|
|
"condition": "{{ steps.run-tests.output.exit_code != 0 }}",
|
|
"max_iterations": 5,
|
|
"steps": [{"id": "fix", "command": "speckit.implement"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["condition_result"] is True
|
|
assert len(result.next_steps) == 1
|
|
|
|
def test_execute_condition_false(self):
|
|
from specify_cli.workflows.steps.while_loop import WhileStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = WhileStep()
|
|
ctx = StepContext(
|
|
steps={"run-tests": {"output": {"exit_code": 0}}}
|
|
)
|
|
config = {
|
|
"id": "retry",
|
|
"condition": "{{ steps.run-tests.output.exit_code != 0 }}",
|
|
"max_iterations": 5,
|
|
"steps": [{"id": "fix", "command": "speckit.implement"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["condition_result"] is False
|
|
assert result.next_steps == []
|
|
|
|
def test_validate_missing_fields(self):
|
|
from specify_cli.workflows.steps.while_loop import WhileStep
|
|
|
|
step = WhileStep()
|
|
errors = step.validate({"id": "test", "steps": []})
|
|
assert any("missing 'condition'" in e for e in errors)
|
|
# max_iterations is optional (defaults to 10)
|
|
|
|
def test_validate_invalid_max_iterations(self):
|
|
from specify_cli.workflows.steps.while_loop import WhileStep
|
|
|
|
step = WhileStep()
|
|
errors = step.validate({"id": "test", "condition": "{{ true }}", "max_iterations": 0, "steps": []})
|
|
assert any("must be an integer >= 1" in e for e in errors)
|
|
|
|
|
|
class TestDoWhileStep:
|
|
"""Test the do-while loop step type."""
|
|
|
|
def test_execute_always_runs_once(self):
|
|
from specify_cli.workflows.steps.do_while import DoWhileStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = DoWhileStep()
|
|
ctx = StepContext()
|
|
config = {
|
|
"id": "cycle",
|
|
"condition": "{{ false }}",
|
|
"max_iterations": 3,
|
|
"steps": [{"id": "refine", "command": "speckit.specify"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert len(result.next_steps) == 1
|
|
assert result.output["loop_type"] == "do-while"
|
|
assert result.output["condition"] == "{{ false }}"
|
|
|
|
def test_execute_with_true_condition(self):
|
|
from specify_cli.workflows.steps.do_while import DoWhileStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = DoWhileStep()
|
|
ctx = StepContext()
|
|
config = {
|
|
"id": "cycle",
|
|
"condition": "{{ true }}",
|
|
"max_iterations": 5,
|
|
"steps": [{"id": "work", "command": "speckit.plan"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
# Body always executes on first call regardless of condition
|
|
assert len(result.next_steps) == 1
|
|
assert result.output["max_iterations"] == 5
|
|
|
|
def test_execute_empty_steps(self):
|
|
from specify_cli.workflows.steps.do_while import DoWhileStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = DoWhileStep()
|
|
ctx = StepContext()
|
|
config = {
|
|
"id": "empty",
|
|
"condition": "{{ false }}",
|
|
"max_iterations": 1,
|
|
"steps": [],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.next_steps == []
|
|
assert result.status.value == "completed"
|
|
|
|
def test_validate_missing_fields(self):
|
|
from specify_cli.workflows.steps.do_while import DoWhileStep
|
|
|
|
step = DoWhileStep()
|
|
errors = step.validate({"id": "test", "steps": []})
|
|
assert any("missing 'condition'" in e for e in errors)
|
|
# max_iterations is optional (defaults to 10)
|
|
|
|
def test_validate_steps_not_list(self):
|
|
from specify_cli.workflows.steps.do_while import DoWhileStep
|
|
|
|
step = DoWhileStep()
|
|
errors = step.validate({
|
|
"id": "test",
|
|
"condition": "{{ true }}",
|
|
"max_iterations": 3,
|
|
"steps": "not-a-list",
|
|
})
|
|
assert any("'steps' must be a list" in e for e in errors)
|
|
|
|
|
|
class TestFanOutStep:
|
|
"""Test the fan-out step type."""
|
|
|
|
def test_execute_with_items(self):
|
|
from specify_cli.workflows.steps.fan_out import FanOutStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = FanOutStep()
|
|
ctx = StepContext(
|
|
steps={"tasks": {"output": {"task_list": [
|
|
{"file": "a.md"},
|
|
{"file": "b.md"},
|
|
]}}}
|
|
)
|
|
config = {
|
|
"id": "parallel",
|
|
"items": "{{ steps.tasks.output.task_list }}",
|
|
"max_concurrency": 3,
|
|
"step": {"id": "impl", "command": "speckit.implement"},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["item_count"] == 2
|
|
assert result.output["max_concurrency"] == 3
|
|
|
|
def test_execute_non_list_items_resolves_empty(self):
|
|
from specify_cli.workflows.steps.fan_out import FanOutStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = FanOutStep()
|
|
ctx = StepContext()
|
|
config = {
|
|
"id": "parallel",
|
|
"items": "{{ undefined_var }}",
|
|
"step": {"id": "impl", "command": "speckit.implement"},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["item_count"] == 0
|
|
assert result.output["items"] == []
|
|
|
|
def test_validate_missing_fields(self):
|
|
from specify_cli.workflows.steps.fan_out import FanOutStep
|
|
|
|
step = FanOutStep()
|
|
errors = step.validate({"id": "test"})
|
|
assert any("missing 'items'" in e for e in errors)
|
|
assert any("missing 'step'" in e for e in errors)
|
|
|
|
def test_validate_step_not_mapping(self):
|
|
from specify_cli.workflows.steps.fan_out import FanOutStep
|
|
|
|
step = FanOutStep()
|
|
errors = step.validate({
|
|
"id": "test",
|
|
"items": "{{ x }}",
|
|
"step": "not-a-dict",
|
|
})
|
|
assert any("'step' must be a mapping" in e for e in errors)
|
|
|
|
|
|
class TestFanInStep:
|
|
"""Test the fan-in step type."""
|
|
|
|
def test_execute_collects_results(self):
|
|
from specify_cli.workflows.steps.fan_in import FanInStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = FanInStep()
|
|
ctx = StepContext(
|
|
steps={
|
|
"parallel": {"output": {"item_count": 2, "status": "done"}}
|
|
}
|
|
)
|
|
config = {
|
|
"id": "collect",
|
|
"wait_for": ["parallel"],
|
|
"output": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert len(result.output["results"]) == 1
|
|
assert result.output["results"][0]["item_count"] == 2
|
|
|
|
def test_execute_multiple_wait_for(self):
|
|
from specify_cli.workflows.steps.fan_in import FanInStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = FanInStep()
|
|
ctx = StepContext(
|
|
steps={
|
|
"task-a": {"output": {"file": "a.md"}},
|
|
"task-b": {"output": {"file": "b.md"}},
|
|
}
|
|
)
|
|
config = {
|
|
"id": "collect",
|
|
"wait_for": ["task-a", "task-b"],
|
|
"output": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert len(result.output["results"]) == 2
|
|
assert result.output["results"][0]["file"] == "a.md"
|
|
assert result.output["results"][1]["file"] == "b.md"
|
|
|
|
def test_execute_missing_wait_for_step(self):
|
|
from specify_cli.workflows.steps.fan_in import FanInStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = FanInStep()
|
|
ctx = StepContext(steps={})
|
|
config = {
|
|
"id": "collect",
|
|
"wait_for": ["nonexistent"],
|
|
"output": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["results"] == [{}]
|
|
|
|
def test_validate_empty_wait_for(self):
|
|
from specify_cli.workflows.steps.fan_in import FanInStep
|
|
|
|
step = FanInStep()
|
|
errors = step.validate({"id": "test", "wait_for": []})
|
|
assert any("non-empty list" in e for e in errors)
|
|
|
|
def test_validate_wait_for_not_list(self):
|
|
from specify_cli.workflows.steps.fan_in import FanInStep
|
|
|
|
step = FanInStep()
|
|
errors = step.validate({"id": "test", "wait_for": "not-a-list"})
|
|
assert any("non-empty list" in e for e in errors)
|
|
|
|
|
|
# ===== Workflow Definition Tests =====
|
|
|
|
class TestWorkflowDefinition:
|
|
"""Test WorkflowDefinition loading and parsing."""
|
|
|
|
def test_from_yaml(self, sample_workflow_file):
|
|
from specify_cli.workflows.engine import WorkflowDefinition
|
|
|
|
definition = WorkflowDefinition.from_yaml(sample_workflow_file)
|
|
assert definition.id == "test-workflow"
|
|
assert definition.name == "Test Workflow"
|
|
assert definition.version == "1.0.0"
|
|
assert len(definition.steps) == 2
|
|
|
|
def test_from_string(self, sample_workflow_yaml):
|
|
from specify_cli.workflows.engine import WorkflowDefinition
|
|
|
|
definition = WorkflowDefinition.from_string(sample_workflow_yaml)
|
|
assert definition.id == "test-workflow"
|
|
assert len(definition.inputs) == 2
|
|
|
|
def test_from_string_invalid(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition
|
|
|
|
with pytest.raises(ValueError, match="must be a mapping"):
|
|
WorkflowDefinition.from_string("- just a list")
|
|
|
|
def test_inputs_parsed(self, sample_workflow_yaml):
|
|
from specify_cli.workflows.engine import WorkflowDefinition
|
|
|
|
definition = WorkflowDefinition.from_string(sample_workflow_yaml)
|
|
assert "spec" in definition.inputs
|
|
assert definition.inputs["spec"]["required"] is True
|
|
assert definition.inputs["scope"]["default"] == "full"
|
|
|
|
|
|
# ===== Workflow Validation Tests =====
|
|
|
|
class TestWorkflowValidation:
|
|
"""Test workflow validation."""
|
|
|
|
def test_valid_workflow(self, sample_workflow_yaml):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string(sample_workflow_yaml)
|
|
errors = validate_workflow(definition)
|
|
assert errors == []
|
|
|
|
def test_missing_id(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("workflow.id" in e for e in errors)
|
|
|
|
def test_invalid_id_format(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "Invalid ID!"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("lowercase alphanumeric" in e for e in errors)
|
|
|
|
def test_no_steps(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "test"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps: []
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("no steps" in e.lower() for e in errors)
|
|
|
|
def test_duplicate_step_ids(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "test"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: same-id
|
|
command: speckit.specify
|
|
- id: same-id
|
|
command: speckit.plan
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("Duplicate" in e for e in errors)
|
|
|
|
def test_invalid_step_type(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "test"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: bad
|
|
type: nonexistent
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("invalid type" in e.lower() for e in errors)
|
|
|
|
def test_nested_step_validation(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "test"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: branch
|
|
type: if
|
|
condition: "{{ true }}"
|
|
then:
|
|
- id: nested-a
|
|
command: speckit.specify
|
|
else:
|
|
- id: nested-b
|
|
command: speckit.plan
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert errors == []
|
|
|
|
def test_invalid_input_type(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "test"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
inputs:
|
|
bad:
|
|
type: array
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("invalid type" in e.lower() for e in errors)
|
|
|
|
|
|
# ===== Workflow Engine Tests =====
|
|
|
|
class TestWorkflowEngine:
|
|
"""Test WorkflowEngine execution."""
|
|
|
|
def test_load_from_file(self, sample_workflow_file, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine
|
|
|
|
engine = WorkflowEngine(project_dir)
|
|
definition = engine.load_workflow(str(sample_workflow_file))
|
|
assert definition.id == "test-workflow"
|
|
|
|
def test_load_from_installed_id(self, sample_workflow_file, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine
|
|
|
|
engine = WorkflowEngine(project_dir)
|
|
definition = engine.load_workflow("test-workflow")
|
|
assert definition.id == "test-workflow"
|
|
|
|
def test_load_not_found(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine
|
|
|
|
engine = WorkflowEngine(project_dir)
|
|
with pytest.raises(FileNotFoundError):
|
|
engine.load_workflow("nonexistent")
|
|
|
|
def test_execute_simple_workflow(self, project_dir):
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "simple"
|
|
name: "Simple"
|
|
version: "1.0.0"
|
|
integration: claude
|
|
inputs:
|
|
name:
|
|
type: string
|
|
default: "test"
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
input:
|
|
args: "{{ inputs.name }}"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
|
|
state = engine.execute(definition, {"name": "login"})
|
|
|
|
assert state.status == RunStatus.FAILED
|
|
assert "step-one" in state.step_results
|
|
assert state.step_results["step-one"]["output"]["command"] == "speckit.specify"
|
|
assert state.step_results["step-one"]["output"]["input"]["args"] == "login"
|
|
|
|
def test_execute_with_gate_pauses(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "gated"
|
|
name: "Gated"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: step-one
|
|
type: shell
|
|
run: "echo test"
|
|
- id: gate
|
|
type: gate
|
|
message: "Review?"
|
|
options: [approve, reject]
|
|
on_reject: abort
|
|
- id: step-two
|
|
type: shell
|
|
run: "echo done"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.PAUSED
|
|
assert "gate" in state.step_results
|
|
assert state.step_results["gate"]["status"] == "paused"
|
|
|
|
def test_execute_with_shell_step(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "shell-test"
|
|
name: "Shell Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: echo
|
|
type: shell
|
|
run: "echo workflow-output"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert "workflow-output" in state.step_results["echo"]["output"]["stdout"]
|
|
|
|
def test_execute_with_if_then(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "branching"
|
|
name: "Branching"
|
|
version: "1.0.0"
|
|
inputs:
|
|
scope:
|
|
type: string
|
|
default: "full"
|
|
steps:
|
|
- id: check
|
|
type: if
|
|
condition: "{{ inputs.scope == 'full' }}"
|
|
then:
|
|
- id: full-tasks
|
|
type: shell
|
|
run: "echo full"
|
|
else:
|
|
- id: partial-tasks
|
|
type: shell
|
|
run: "echo partial"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition, {"scope": "full"})
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert "full-tasks" in state.step_results
|
|
assert "partial-tasks" not in state.step_results
|
|
|
|
def test_execute_missing_required_input(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "needs-input"
|
|
name: "Needs Input"
|
|
version: "1.0.0"
|
|
inputs:
|
|
name:
|
|
type: string
|
|
required: true
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
input:
|
|
args: "{{ inputs.name }}"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
|
|
with pytest.raises(ValueError, match="Required input"):
|
|
engine.execute(definition, {})
|
|
|
|
|
|
# ===== State Persistence Tests =====
|
|
|
|
class TestRunState:
|
|
"""Test RunState persistence and loading."""
|
|
|
|
def test_save_and_load(self, project_dir):
|
|
from specify_cli.workflows.engine import RunState
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
state = RunState(
|
|
run_id="test-run",
|
|
workflow_id="test-workflow",
|
|
project_root=project_dir,
|
|
)
|
|
state.status = RunStatus.RUNNING
|
|
state.inputs = {"name": "login"}
|
|
state.step_results = {
|
|
"step-one": {
|
|
"output": {"file": "spec.md"},
|
|
"status": "completed",
|
|
}
|
|
}
|
|
state.save()
|
|
|
|
loaded = RunState.load("test-run", project_dir)
|
|
assert loaded.run_id == "test-run"
|
|
assert loaded.workflow_id == "test-workflow"
|
|
assert loaded.status == RunStatus.RUNNING
|
|
assert loaded.inputs == {"name": "login"}
|
|
assert "step-one" in loaded.step_results
|
|
|
|
def test_load_not_found(self, project_dir):
|
|
from specify_cli.workflows.engine import RunState
|
|
|
|
with pytest.raises(FileNotFoundError):
|
|
RunState.load("nonexistent", project_dir)
|
|
|
|
def test_append_log(self, project_dir):
|
|
from specify_cli.workflows.engine import RunState
|
|
|
|
state = RunState(
|
|
run_id="log-test",
|
|
workflow_id="test",
|
|
project_root=project_dir,
|
|
)
|
|
state.append_log({"event": "test_event", "data": "hello"})
|
|
|
|
log_file = state.runs_dir / "log.jsonl"
|
|
assert log_file.exists()
|
|
lines = log_file.read_text().strip().split("\n")
|
|
entry = json.loads(lines[0])
|
|
assert entry["event"] == "test_event"
|
|
assert "timestamp" in entry
|
|
|
|
|
|
class TestListRuns:
|
|
"""Test listing workflow runs."""
|
|
|
|
def test_list_empty(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine
|
|
|
|
engine = WorkflowEngine(project_dir)
|
|
assert engine.list_runs() == []
|
|
|
|
def test_list_after_execution(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "list-test"
|
|
name: "List Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: step-one
|
|
type: shell
|
|
run: "echo test"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
engine.execute(definition)
|
|
|
|
runs = engine.list_runs()
|
|
assert len(runs) == 1
|
|
assert runs[0]["workflow_id"] == "list-test"
|
|
|
|
|
|
# ===== Workflow Registry Tests =====
|
|
|
|
class TestWorkflowRegistry:
|
|
"""Test WorkflowRegistry operations."""
|
|
|
|
def test_add_and_get(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowRegistry
|
|
|
|
registry = WorkflowRegistry(project_dir)
|
|
registry.add("test-wf", {"name": "Test", "version": "1.0.0"})
|
|
|
|
entry = registry.get("test-wf")
|
|
assert entry is not None
|
|
assert entry["name"] == "Test"
|
|
assert "installed_at" in entry
|
|
|
|
def test_remove(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowRegistry
|
|
|
|
registry = WorkflowRegistry(project_dir)
|
|
registry.add("test-wf", {"name": "Test"})
|
|
assert registry.is_installed("test-wf")
|
|
|
|
registry.remove("test-wf")
|
|
assert not registry.is_installed("test-wf")
|
|
|
|
def test_list(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowRegistry
|
|
|
|
registry = WorkflowRegistry(project_dir)
|
|
registry.add("wf-a", {"name": "A"})
|
|
registry.add("wf-b", {"name": "B"})
|
|
|
|
installed = registry.list()
|
|
assert "wf-a" in installed
|
|
assert "wf-b" in installed
|
|
|
|
def test_is_installed(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowRegistry
|
|
|
|
registry = WorkflowRegistry(project_dir)
|
|
assert not registry.is_installed("missing")
|
|
|
|
registry.add("exists", {"name": "Exists"})
|
|
assert registry.is_installed("exists")
|
|
|
|
def test_persistence(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowRegistry
|
|
|
|
registry1 = WorkflowRegistry(project_dir)
|
|
registry1.add("test-wf", {"name": "Test"})
|
|
|
|
# Load fresh
|
|
registry2 = WorkflowRegistry(project_dir)
|
|
assert registry2.is_installed("test-wf")
|
|
|
|
|
|
# ===== Workflow Catalog Tests =====
|
|
|
|
class TestWorkflowCatalog:
|
|
"""Test WorkflowCatalog catalog resolution."""
|
|
|
|
def test_default_catalogs(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
entries = catalog.get_active_catalogs()
|
|
assert len(entries) == 2
|
|
assert entries[0].name == "default"
|
|
assert entries[1].name == "community"
|
|
|
|
def test_env_var_override(self, project_dir, monkeypatch):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
monkeypatch.setenv("SPECKIT_WORKFLOW_CATALOG_URL", "https://example.com/catalog.json")
|
|
catalog = WorkflowCatalog(project_dir)
|
|
entries = catalog.get_active_catalogs()
|
|
assert len(entries) == 1
|
|
assert entries[0].name == "env-override"
|
|
assert entries[0].url == "https://example.com/catalog.json"
|
|
|
|
def test_project_level_config(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
config_path = project_dir / ".specify" / "workflow-catalogs.yml"
|
|
config_path.write_text(yaml.dump({
|
|
"catalogs": [{
|
|
"name": "custom",
|
|
"url": "https://example.com/wf-catalog.json",
|
|
"priority": 1,
|
|
"install_allowed": True,
|
|
}]
|
|
}))
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
entries = catalog.get_active_catalogs()
|
|
assert len(entries) == 1
|
|
assert entries[0].name == "custom"
|
|
|
|
def test_validate_url_http_rejected(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
with pytest.raises(WorkflowValidationError, match="HTTPS"):
|
|
catalog._validate_catalog_url("http://evil.com/catalog.json")
|
|
|
|
def test_validate_url_localhost_http_allowed(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
# Should not raise
|
|
catalog._validate_catalog_url("http://localhost:8080/catalog.json")
|
|
|
|
def test_add_catalog(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
catalog.add_catalog("https://example.com/new-catalog.json", "my-catalog")
|
|
|
|
config_path = project_dir / ".specify" / "workflow-catalogs.yml"
|
|
assert config_path.exists()
|
|
data = yaml.safe_load(config_path.read_text())
|
|
assert len(data["catalogs"]) == 1
|
|
assert data["catalogs"][0]["url"] == "https://example.com/new-catalog.json"
|
|
|
|
def test_add_catalog_duplicate_rejected(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
catalog.add_catalog("https://example.com/catalog.json")
|
|
|
|
with pytest.raises(WorkflowValidationError, match="already configured"):
|
|
catalog.add_catalog("https://example.com/catalog.json")
|
|
|
|
def test_remove_catalog(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
catalog.add_catalog("https://example.com/c1.json", "first")
|
|
catalog.add_catalog("https://example.com/c2.json", "second")
|
|
|
|
removed = catalog.remove_catalog(0)
|
|
assert removed == "first"
|
|
|
|
config_path = project_dir / ".specify" / "workflow-catalogs.yml"
|
|
data = yaml.safe_load(config_path.read_text())
|
|
assert len(data["catalogs"]) == 1
|
|
|
|
def test_remove_catalog_invalid_index(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
catalog.add_catalog("https://example.com/c1.json")
|
|
|
|
with pytest.raises(WorkflowValidationError, match="out of range"):
|
|
catalog.remove_catalog(5)
|
|
|
|
def test_get_catalog_configs(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
configs = catalog.get_catalog_configs()
|
|
assert len(configs) == 2
|
|
assert configs[0]["name"] == "default"
|
|
assert isinstance(configs[0]["install_allowed"], bool)
|
|
|
|
|
|
# ===== Integration Test =====
|
|
|
|
class TestWorkflowIntegration:
|
|
"""End-to-end workflow execution tests."""
|
|
|
|
def test_full_sequential_workflow(self, project_dir):
|
|
"""Execute a multi-step sequential workflow end to end."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "e2e-test"
|
|
name: "E2E Test"
|
|
version: "1.0.0"
|
|
integration: claude
|
|
inputs:
|
|
feature:
|
|
type: string
|
|
default: "login"
|
|
steps:
|
|
- id: specify
|
|
type: shell
|
|
run: "echo speckit.specify {{ inputs.feature }}"
|
|
|
|
- id: check-scope
|
|
type: if
|
|
condition: "{{ inputs.feature == 'login' }}"
|
|
then:
|
|
- id: echo-full
|
|
type: shell
|
|
run: "echo full scope"
|
|
else:
|
|
- id: echo-partial
|
|
type: shell
|
|
run: "echo partial scope"
|
|
|
|
- id: plan
|
|
type: shell
|
|
run: "echo speckit.plan"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert "specify" in state.step_results
|
|
assert "check-scope" in state.step_results
|
|
assert "echo-full" in state.step_results
|
|
assert "echo-partial" not in state.step_results
|
|
assert "plan" in state.step_results
|
|
|
|
def test_switch_workflow(self, project_dir):
|
|
"""Test switch step type in a workflow."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "switch-test"
|
|
name: "Switch Test"
|
|
version: "1.0.0"
|
|
inputs:
|
|
action:
|
|
type: string
|
|
default: "plan"
|
|
steps:
|
|
- id: route
|
|
type: switch
|
|
expression: "{{ inputs.action }}"
|
|
cases:
|
|
specify:
|
|
- id: do-specify
|
|
type: shell
|
|
run: "echo specify"
|
|
plan:
|
|
- id: do-plan
|
|
type: shell
|
|
run: "echo plan"
|
|
default:
|
|
- id: do-default
|
|
type: shell
|
|
run: "echo default"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert "do-plan" in state.step_results
|
|
assert "do-specify" not in state.step_results
|