mirror of
https://github.com/github/spec-kit.git
synced 2026-07-04 04:45:43 +08:00
* feat(workflows): expose `{{ context.run_id }}` template variable
Closes #2590.
Surfaces the engine-assigned run id (the same 8-character hex
string Spec Kit prints as `Run ID:` at the end of
`workflow run`) as a workflow template variable so YAML
authors can reference it from shell `run:`, command
`input.args:`, switch `expression:`, and any other field that
already evaluates `{{ ... }}` templates.
### Why
The run id is the natural join key between a Spec Kit workflow
run and downstream artifacts, telemetry, or per-run scratch
state. Today the operator sees it in stdout but workflows
themselves cannot reference it — there was no way to stamp a
log line, name a scratch directory, or tag an artifact with
the same id Spec Kit assigned.
The three motivating use cases from the issue:
1. Telemetry / observability — stamp logs and events with the
run id so external systems can join workflow runs to
downstream artifacts.
2. Per-run scratch / isolation — interactive operator commands
that need their own state directory under
`/tmp/run-<id>/`.
3. Run-id in artifact metadata — stable join key from artifact
back to the producing run.
### Implementation
`StepContext.run_id` is already populated by `WorkflowEngine`
in both `execute()` and `resume()`. The only gap was the
template namespace builder.
`_build_namespace` (in `workflows/expressions.py`) now adds a
`context` key alongside the existing `inputs`, `steps`,
`item`, and `fan_in` namespaces:
```python
ns["context"] = {"run_id": run_id}
```
The value is always present (even outside a run) and falls
back to an empty string when no run is active. Workflows
referencing `{{ context.run_id }}` therefore never error — a
hard requirement from the issue's acceptance criteria for
dry-run, validation, and ad-hoc evaluator usage.
### Default behaviour preserved
Workflows that do not reference `{{ context.run_id }}` are
byte-equivalent to before this change. The `context`
namespace is added unconditionally to keep template
resolution branch-free, but its presence has no observable
effect when nothing references it.
### Tests
`TestExpressions` (unit-level) gains three tests:
- `test_context_run_id_resolves` — direct lookup against a
`StepContext(run_id=...)`.
- `test_context_run_id_defaults_to_empty_when_unset` —
graceful default outside a run context.
- `test_context_run_id_string_interpolation` — mixed
template (e.g. `"RUN_ID={{ context.run_id }}"`).
`TestContextRunId` (end-to-end) covers the three step types
the acceptance criteria called out:
- `test_shell_run_resolves_run_id` — `run:` field
substitution, verified via captured stdout.
- `test_command_input_args_resolves_run_id` — `input.args:`
resolution, captured in step output even when CLI dispatch
is unavailable (the artifact-metadata use case).
- `test_switch_expression_matches_on_run_id` — switch
matches against the resolved value, proving the run id is a
first-class value in the expression engine, not just an
interpolation token.
- `test_workflow_without_context_reference_unchanged` —
locks the byte-equivalent default required by the issue.
### Docs
`workflows/README.md` gains a "Runtime Context" subsection
under "Expressions" documenting the new namespace and the
three canonical use patterns (telemetry, per-run scratch,
artifact metadata).
* test(workflows): drop inline double-quotes in run_id shell tests
`test_shell_run_resolves_run_id` and
`test_switch_expression_matches_on_run_id` used
`run: 'echo "RUN_ID={{ context.run_id }}"'` with inner double-quotes
around the echo argument. Bash/sh strips those quotes before invoking
echo, but cmd.exe (used on Windows when `shell=True`) treats them
as literal characters and emits `"RUN_ID=abc12345"` — failing the
exact-match assertion. Linux passed; all three Windows-latest matrix
entries failed with `assert '"RUN_ID=abc12345"' == 'RUN_ID=abc12345'`.
Resolve by dropping the inner double-quotes (the value has no spaces
or shell metacharacters) and wrapping the YAML scalar in plain
double-quotes the same way other shell-step tests in this file do
(e.g. `run: "echo b-saw-..."`). Behaviour-equivalent on POSIX,
portable to cmd.exe.
2683 lines
91 KiB
Python
2683 lines
91 KiB
Python
"""Tests for the workflow engine subsystem.
|
|
|
|
Covers:
|
|
- Step registry & auto-discovery
|
|
- Base classes (StepBase, StepContext, StepResult)
|
|
- Expression engine
|
|
- All 10 built-in step types
|
|
- Workflow definition loading & validation
|
|
- Workflow engine execution & state persistence
|
|
- Workflow catalog & registry
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
import yaml
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def temp_dir():
|
|
"""Create a temporary directory for tests."""
|
|
tmpdir = tempfile.mkdtemp()
|
|
yield Path(tmpdir)
|
|
shutil.rmtree(tmpdir)
|
|
|
|
|
|
@pytest.fixture
|
|
def project_dir(temp_dir):
|
|
"""Create a mock spec-kit project with .specify/ directory."""
|
|
specify_dir = temp_dir / ".specify"
|
|
specify_dir.mkdir()
|
|
(specify_dir / "workflows").mkdir()
|
|
return temp_dir
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_workflow_yaml():
|
|
"""Return a valid minimal workflow YAML string."""
|
|
return """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "test-workflow"
|
|
name: "Test Workflow"
|
|
version: "1.0.0"
|
|
description: "A test workflow"
|
|
|
|
inputs:
|
|
spec:
|
|
type: string
|
|
required: true
|
|
scope:
|
|
type: string
|
|
default: "full"
|
|
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
input:
|
|
args: "{{ inputs.spec }}"
|
|
|
|
- id: step-two
|
|
command: speckit.plan
|
|
input:
|
|
args: "{{ steps.step-one.output.command }}"
|
|
"""
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_workflow_file(project_dir, sample_workflow_yaml):
|
|
"""Write a sample workflow YAML to a file and return its path."""
|
|
wf_dir = project_dir / ".specify" / "workflows" / "test-workflow"
|
|
wf_dir.mkdir(parents=True, exist_ok=True)
|
|
wf_path = wf_dir / "workflow.yml"
|
|
wf_path.write_text(sample_workflow_yaml, encoding="utf-8")
|
|
return wf_path
|
|
|
|
|
|
# ===== Step Registry Tests =====
|
|
|
|
class TestStepRegistry:
|
|
"""Test STEP_REGISTRY and auto-discovery."""
|
|
|
|
def test_registry_populated(self):
|
|
from specify_cli.workflows import STEP_REGISTRY
|
|
|
|
assert len(STEP_REGISTRY) >= 10
|
|
|
|
def test_all_step_types_registered(self):
|
|
from specify_cli.workflows import STEP_REGISTRY
|
|
|
|
expected = {
|
|
"command", "shell", "prompt", "gate", "if", "switch",
|
|
"while", "do-while", "fan-out", "fan-in",
|
|
}
|
|
assert expected.issubset(set(STEP_REGISTRY.keys()))
|
|
|
|
def test_get_step_type(self):
|
|
from specify_cli.workflows import get_step_type
|
|
|
|
step = get_step_type("command")
|
|
assert step is not None
|
|
assert step.type_key == "command"
|
|
|
|
def test_get_step_type_missing(self):
|
|
from specify_cli.workflows import get_step_type
|
|
|
|
assert get_step_type("nonexistent") is None
|
|
|
|
def test_register_step_duplicate_raises(self):
|
|
from specify_cli.workflows import _register_step
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
|
|
with pytest.raises(KeyError, match="already registered"):
|
|
_register_step(CommandStep())
|
|
|
|
def test_register_step_empty_key_raises(self):
|
|
from specify_cli.workflows import _register_step
|
|
from specify_cli.workflows.base import StepBase, StepResult
|
|
|
|
class EmptyStep(StepBase):
|
|
type_key = ""
|
|
def execute(self, config, context):
|
|
return StepResult()
|
|
|
|
with pytest.raises(ValueError, match="empty type_key"):
|
|
_register_step(EmptyStep())
|
|
|
|
|
|
# ===== Base Classes Tests =====
|
|
|
|
class TestBaseClasses:
|
|
"""Test StepBase, StepContext, StepResult."""
|
|
|
|
def test_step_context_defaults(self):
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert ctx.inputs == {}
|
|
assert ctx.steps == {}
|
|
assert ctx.item is None
|
|
assert ctx.fan_in == {}
|
|
assert ctx.default_integration is None
|
|
|
|
def test_step_context_with_data(self):
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(
|
|
inputs={"name": "test"},
|
|
default_integration="claude",
|
|
default_model="sonnet-4",
|
|
)
|
|
assert ctx.inputs == {"name": "test"}
|
|
assert ctx.default_integration == "claude"
|
|
assert ctx.default_model == "sonnet-4"
|
|
|
|
def test_step_result_defaults(self):
|
|
from specify_cli.workflows.base import StepResult, StepStatus
|
|
|
|
result = StepResult()
|
|
assert result.status == StepStatus.COMPLETED
|
|
assert result.output == {}
|
|
assert result.next_steps == []
|
|
assert result.error is None
|
|
|
|
def test_step_status_values(self):
|
|
from specify_cli.workflows.base import StepStatus
|
|
|
|
assert StepStatus.PENDING == "pending"
|
|
assert StepStatus.RUNNING == "running"
|
|
assert StepStatus.COMPLETED == "completed"
|
|
assert StepStatus.FAILED == "failed"
|
|
assert StepStatus.SKIPPED == "skipped"
|
|
assert StepStatus.PAUSED == "paused"
|
|
|
|
def test_run_status_values(self):
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
assert RunStatus.CREATED == "created"
|
|
assert RunStatus.RUNNING == "running"
|
|
assert RunStatus.PAUSED == "paused"
|
|
assert RunStatus.COMPLETED == "completed"
|
|
assert RunStatus.FAILED == "failed"
|
|
assert RunStatus.ABORTED == "aborted"
|
|
|
|
|
|
# ===== Expression Engine Tests =====
|
|
|
|
class TestExpressions:
|
|
"""Test sandboxed expression evaluator."""
|
|
|
|
def test_simple_variable(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"name": "login"})
|
|
assert evaluate_expression("{{ inputs.name }}", ctx) == "login"
|
|
|
|
def test_step_output_reference(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(
|
|
steps={"specify": {"output": {"file": "spec.md"}}}
|
|
)
|
|
assert evaluate_expression("{{ steps.specify.output.file }}", ctx) == "spec.md"
|
|
|
|
def test_string_interpolation(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"name": "login"})
|
|
result = evaluate_expression("Feature: {{ inputs.name }} done", ctx)
|
|
assert result == "Feature: login done"
|
|
|
|
def test_comparison_equals(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"scope": "full"})
|
|
assert evaluate_expression("{{ inputs.scope == 'full' }}", ctx) is True
|
|
assert evaluate_expression("{{ inputs.scope == 'partial' }}", ctx) is False
|
|
|
|
def test_comparison_not_equals(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(
|
|
steps={"run-tests": {"output": {"exit_code": 1}}}
|
|
)
|
|
result = evaluate_expression("{{ steps.run-tests.output.exit_code != 0 }}", ctx)
|
|
assert result is True
|
|
|
|
def test_numeric_comparison(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(
|
|
steps={"plan": {"output": {"task_count": 7}}}
|
|
)
|
|
assert evaluate_expression("{{ steps.plan.output.task_count > 5 }}", ctx) is True
|
|
assert evaluate_expression("{{ steps.plan.output.task_count < 5 }}", ctx) is False
|
|
|
|
def test_boolean_and(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"a": True, "b": True})
|
|
assert evaluate_expression("{{ inputs.a and inputs.b }}", ctx) is True
|
|
|
|
def test_boolean_or(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"a": False, "b": True})
|
|
assert evaluate_expression("{{ inputs.a or inputs.b }}", ctx) is True
|
|
|
|
def test_filter_default(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert evaluate_expression("{{ inputs.missing | default('fallback') }}", ctx) == "fallback"
|
|
|
|
def test_filter_join(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"tags": ["a", "b", "c"]})
|
|
assert evaluate_expression("{{ inputs.tags | join(', ') }}", ctx) == "a, b, c"
|
|
|
|
def test_filter_contains(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"text": "hello world"})
|
|
assert evaluate_expression("{{ inputs.text | contains('world') }}", ctx) is True
|
|
|
|
def test_condition_evaluation(self):
|
|
from specify_cli.workflows.expressions import evaluate_condition
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(inputs={"ready": True})
|
|
assert evaluate_condition("{{ inputs.ready }}", ctx) is True
|
|
assert evaluate_condition("{{ inputs.missing }}", ctx) is False
|
|
|
|
def test_non_string_passthrough(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert evaluate_expression(42, ctx) == 42
|
|
assert evaluate_expression(None, ctx) is None
|
|
|
|
def test_string_literal(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert evaluate_expression("{{ 'hello' }}", ctx) == "hello"
|
|
|
|
def test_numeric_literal(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert evaluate_expression("{{ 42 }}", ctx) == 42
|
|
|
|
def test_boolean_literal(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext()
|
|
assert evaluate_expression("{{ true }}", ctx) is True
|
|
assert evaluate_expression("{{ false }}", ctx) is False
|
|
|
|
def test_list_indexing(self):
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(
|
|
steps={"tasks": {"output": {"task_list": [{"file": "a.md"}, {"file": "b.md"}]}}}
|
|
)
|
|
result = evaluate_expression("{{ steps.tasks.output.task_list[0].file }}", ctx)
|
|
assert result == "a.md"
|
|
|
|
def test_context_run_id_resolves(self):
|
|
"""``{{ context.run_id }}`` resolves to ``StepContext.run_id``.
|
|
|
|
Locks the contract from issue #2590: workflow templates can
|
|
reference the engine-assigned run id for telemetry, artifact
|
|
metadata, or per-run scratch isolation.
|
|
"""
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(run_id="a1b2c3d4")
|
|
assert evaluate_expression("{{ context.run_id }}", ctx) == "a1b2c3d4"
|
|
|
|
def test_context_run_id_defaults_to_empty_when_unset(self):
|
|
"""``{{ context.run_id }}`` resolves to ``""`` when no run is
|
|
active (dry-run, validation, ad-hoc evaluator usage) rather
|
|
than raising — workflows referencing the variable never error
|
|
outside a run context.
|
|
"""
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
# No run_id set on the context.
|
|
ctx = StepContext()
|
|
assert evaluate_expression("{{ context.run_id }}", ctx) == ""
|
|
|
|
def test_context_run_id_string_interpolation(self):
|
|
"""Run id interpolates inside a larger template string — the
|
|
common pattern for stamping shell commands and artifact paths
|
|
with the run id.
|
|
"""
|
|
from specify_cli.workflows.expressions import evaluate_expression
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
ctx = StepContext(run_id="deadbeef")
|
|
result = evaluate_expression("RUN_ID={{ context.run_id }}", ctx)
|
|
assert result == "RUN_ID=deadbeef"
|
|
|
|
|
|
# ===== Integration Dispatch Tests =====
|
|
|
|
class TestBuildExecArgs:
|
|
"""Test build_exec_args for CLI-based integrations."""
|
|
|
|
def test_claude_exec_args(self):
|
|
from specify_cli.integrations.claude import ClaudeIntegration
|
|
impl = ClaudeIntegration()
|
|
args = impl.build_exec_args("do stuff", model="sonnet-4")
|
|
assert args[0] == "claude"
|
|
assert args[1] == "-p"
|
|
assert args[2] == "do stuff"
|
|
assert "--model" in args
|
|
assert "sonnet-4" in args
|
|
assert "--output-format" in args
|
|
|
|
def test_gemini_exec_args(self):
|
|
from specify_cli.integrations.gemini import GeminiIntegration
|
|
impl = GeminiIntegration()
|
|
args = impl.build_exec_args("do stuff", model="gemini-2.5-pro")
|
|
assert args[0] == "gemini"
|
|
assert args[1] == "-p"
|
|
assert "-m" in args
|
|
assert "gemini-2.5-pro" in args
|
|
|
|
def test_codex_exec_args(self):
|
|
from specify_cli.integrations.codex import CodexIntegration
|
|
impl = CodexIntegration()
|
|
args = impl.build_exec_args("do stuff")
|
|
assert args[0] == "codex"
|
|
assert args[1] == "exec"
|
|
assert args[2] == "do stuff"
|
|
assert "--json" in args
|
|
|
|
def test_copilot_exec_args(self, monkeypatch):
|
|
monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False)
|
|
monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False)
|
|
from specify_cli.integrations.copilot import CopilotIntegration
|
|
impl = CopilotIntegration()
|
|
args = impl.build_exec_args("do stuff", model="claude-sonnet-4-20250514")
|
|
expected_exec = "copilot.cmd" if os.name == "nt" else "copilot"
|
|
assert args[0] == expected_exec
|
|
assert "-p" in args
|
|
assert "--yolo" in args
|
|
assert "--model" in args
|
|
|
|
def test_copilot_new_env_var_disables_yolo(self, monkeypatch):
|
|
monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "0")
|
|
monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False)
|
|
from specify_cli.integrations.copilot import CopilotIntegration
|
|
impl = CopilotIntegration()
|
|
args = impl.build_exec_args("do stuff")
|
|
assert "--yolo" not in args
|
|
|
|
def test_copilot_deprecated_env_var_still_honoured(self, monkeypatch):
|
|
monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False)
|
|
monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0")
|
|
import warnings
|
|
from specify_cli.integrations.copilot import CopilotIntegration
|
|
impl = CopilotIntegration()
|
|
with warnings.catch_warnings(record=True) as w:
|
|
warnings.simplefilter("always")
|
|
args = impl.build_exec_args("do stuff")
|
|
assert "--yolo" not in args
|
|
assert any(
|
|
"SPECKIT_ALLOW_ALL_TOOLS is deprecated" in str(x.message)
|
|
and issubclass(x.category, UserWarning)
|
|
for x in w
|
|
)
|
|
|
|
def test_copilot_new_env_var_takes_precedence(self, monkeypatch):
|
|
monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "1")
|
|
monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0")
|
|
from specify_cli.integrations.copilot import CopilotIntegration
|
|
impl = CopilotIntegration()
|
|
args = impl.build_exec_args("do stuff")
|
|
assert "--yolo" in args
|
|
|
|
def test_ide_only_returns_none(self):
|
|
from specify_cli.integrations.windsurf import WindsurfIntegration
|
|
impl = WindsurfIntegration()
|
|
assert impl.build_exec_args("test") is None
|
|
|
|
def test_no_model_omits_flag(self):
|
|
from specify_cli.integrations.claude import ClaudeIntegration
|
|
impl = ClaudeIntegration()
|
|
args = impl.build_exec_args("do stuff", model=None)
|
|
assert "--model" not in args
|
|
|
|
def test_no_json_omits_flag(self):
|
|
from specify_cli.integrations.claude import ClaudeIntegration
|
|
impl = ClaudeIntegration()
|
|
args = impl.build_exec_args("do stuff", output_json=False)
|
|
assert "--output-format" not in args
|
|
|
|
|
|
# ===== Step Type Tests =====
|
|
|
|
class TestCommandStep:
|
|
"""Test the command step type."""
|
|
|
|
def test_execute_basic(self):
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(
|
|
inputs={"name": "login"},
|
|
default_integration="claude",
|
|
)
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.specify",
|
|
"input": {"args": "{{ inputs.name }}"},
|
|
}
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.FAILED
|
|
assert result.output["command"] == "speckit.specify"
|
|
assert result.output["integration"] == "claude"
|
|
assert result.output["input"]["args"] == "login"
|
|
|
|
def test_validate_missing_command(self):
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
|
|
step = CommandStep()
|
|
errors = step.validate({"id": "test"})
|
|
assert any("missing 'command'" in e for e in errors)
|
|
|
|
def test_step_override_integration(self):
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(default_integration="claude")
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.plan",
|
|
"integration": "gemini",
|
|
"input": {},
|
|
}
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
|
|
result = step.execute(config, ctx)
|
|
assert result.output["integration"] == "gemini"
|
|
|
|
def test_step_override_model(self):
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(default_model="sonnet-4")
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.implement",
|
|
"model": "opus-4",
|
|
"input": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["model"] == "opus-4"
|
|
|
|
def test_options_merge(self):
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(default_options={"max-tokens": 8000})
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.plan",
|
|
"options": {"thinking-budget": 32768},
|
|
"input": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["options"]["max-tokens"] == 8000
|
|
assert result.output["options"]["thinking-budget"] == 32768
|
|
|
|
def test_dispatch_not_attempted_without_cli(self):
|
|
"""When the CLI tool is not installed, step should fail."""
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(
|
|
inputs={"name": "login"},
|
|
default_integration="claude",
|
|
project_root="/tmp",
|
|
)
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.specify",
|
|
"input": {"args": "{{ inputs.name }}"},
|
|
}
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.FAILED
|
|
assert result.output["dispatched"] is False
|
|
assert result.error is not None
|
|
|
|
def test_dispatch_with_mock_cli(self, tmp_path, monkeypatch):
|
|
"""When the CLI is installed, dispatch invokes the command by name."""
|
|
from unittest.mock import patch, MagicMock
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(
|
|
inputs={"name": "login"},
|
|
default_integration="claude",
|
|
project_root=str(tmp_path),
|
|
)
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.specify",
|
|
"input": {"args": "{{ inputs.name }}"},
|
|
}
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.returncode = 0
|
|
mock_result.stdout = '{"result": "done"}'
|
|
mock_result.stderr = ""
|
|
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \
|
|
patch("subprocess.run", return_value=mock_result) as mock_run:
|
|
result = step.execute(config, ctx)
|
|
|
|
assert result.status == StepStatus.COMPLETED
|
|
assert result.output["dispatched"] is True
|
|
assert result.output["exit_code"] == 0
|
|
# Verify the CLI was called with -p and the skill invocation
|
|
call_args = mock_run.call_args
|
|
assert call_args[0][0][0] == "claude"
|
|
assert call_args[0][0][1] == "-p"
|
|
# Claude is a SkillsIntegration so uses /speckit-specify
|
|
assert "/speckit-specify login" in call_args[0][0][2]
|
|
|
|
def test_dispatch_failure_returns_failed_status(self, tmp_path):
|
|
"""When the CLI exits non-zero, the step should fail."""
|
|
from unittest.mock import patch, MagicMock
|
|
from specify_cli.workflows.steps.command import CommandStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = CommandStep()
|
|
ctx = StepContext(
|
|
inputs={},
|
|
default_integration="claude",
|
|
project_root=str(tmp_path),
|
|
)
|
|
config = {
|
|
"id": "test",
|
|
"command": "speckit.specify",
|
|
"input": {"args": "test"},
|
|
}
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.returncode = 1
|
|
mock_result.stdout = ""
|
|
mock_result.stderr = "API error"
|
|
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \
|
|
patch("subprocess.run", return_value=mock_result):
|
|
result = step.execute(config, ctx)
|
|
|
|
assert result.status == StepStatus.FAILED
|
|
assert result.output["dispatched"] is True
|
|
assert result.output["exit_code"] == 1
|
|
|
|
|
|
class TestPromptStep:
|
|
"""Test the prompt step type."""
|
|
|
|
def test_execute_basic(self):
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = PromptStep()
|
|
ctx = StepContext(
|
|
inputs={"file": "auth.py"},
|
|
default_integration="claude",
|
|
)
|
|
config = {
|
|
"id": "review",
|
|
"type": "prompt",
|
|
"prompt": "Review {{ inputs.file }} for security issues",
|
|
}
|
|
with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None):
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.FAILED
|
|
assert result.output["prompt"] == "Review auth.py for security issues"
|
|
assert result.output["integration"] == "claude"
|
|
assert result.output["dispatched"] is False
|
|
|
|
def test_execute_with_step_integration(self):
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = PromptStep()
|
|
ctx = StepContext(default_integration="claude")
|
|
config = {
|
|
"id": "review",
|
|
"type": "prompt",
|
|
"prompt": "Summarize the codebase",
|
|
"integration": "gemini",
|
|
}
|
|
with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None):
|
|
result = step.execute(config, ctx)
|
|
assert result.output["integration"] == "gemini"
|
|
|
|
def test_execute_with_model(self):
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = PromptStep()
|
|
ctx = StepContext(default_integration="claude", default_model="sonnet-4")
|
|
config = {
|
|
"id": "review",
|
|
"type": "prompt",
|
|
"prompt": "hello",
|
|
"model": "opus-4",
|
|
}
|
|
with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None):
|
|
result = step.execute(config, ctx)
|
|
assert result.output["model"] == "opus-4"
|
|
|
|
def test_dispatch_with_mock_cli(self, tmp_path):
|
|
from unittest.mock import patch, MagicMock
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = PromptStep()
|
|
ctx = StepContext(
|
|
default_integration="claude",
|
|
project_root=str(tmp_path),
|
|
)
|
|
config = {
|
|
"id": "ask",
|
|
"type": "prompt",
|
|
"prompt": "Explain this code",
|
|
}
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.returncode = 0
|
|
mock_result.stdout = "Here is the explanation"
|
|
mock_result.stderr = ""
|
|
|
|
with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value="/usr/local/bin/claude"), \
|
|
patch("subprocess.run", return_value=mock_result):
|
|
result = step.execute(config, ctx)
|
|
|
|
assert result.status == StepStatus.COMPLETED
|
|
assert result.output["dispatched"] is True
|
|
assert result.output["exit_code"] == 0
|
|
|
|
def test_validate_missing_prompt(self):
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
|
|
step = PromptStep()
|
|
errors = step.validate({"id": "test"})
|
|
assert any("missing 'prompt'" in e for e in errors)
|
|
|
|
def test_validate_valid(self):
|
|
from specify_cli.workflows.steps.prompt import PromptStep
|
|
|
|
step = PromptStep()
|
|
errors = step.validate({"id": "test", "prompt": "do something"})
|
|
assert errors == []
|
|
|
|
|
|
class TestShellStep:
|
|
"""Test the shell step type."""
|
|
|
|
def test_execute_echo(self):
|
|
from specify_cli.workflows.steps.shell import ShellStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = ShellStep()
|
|
ctx = StepContext()
|
|
config = {"id": "test", "run": "echo hello"}
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.COMPLETED
|
|
assert result.output["exit_code"] == 0
|
|
assert "hello" in result.output["stdout"]
|
|
|
|
def test_execute_failure(self):
|
|
from specify_cli.workflows.steps.shell import ShellStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = ShellStep()
|
|
ctx = StepContext()
|
|
config = {"id": "test", "run": "exit 1"}
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.FAILED
|
|
assert result.output["exit_code"] == 1
|
|
assert result.error is not None
|
|
|
|
def test_validate_missing_run(self):
|
|
from specify_cli.workflows.steps.shell import ShellStep
|
|
|
|
step = ShellStep()
|
|
errors = step.validate({"id": "test"})
|
|
assert any("missing 'run'" in e for e in errors)
|
|
|
|
|
|
class TestGateStep:
|
|
"""Test the gate step type."""
|
|
|
|
def test_execute_returns_paused(self):
|
|
from specify_cli.workflows.steps.gate import GateStep
|
|
from specify_cli.workflows.base import StepContext, StepStatus
|
|
|
|
step = GateStep()
|
|
ctx = StepContext()
|
|
config = {
|
|
"id": "review",
|
|
"message": "Review the spec.",
|
|
"options": ["approve", "reject"],
|
|
"on_reject": "abort",
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.status == StepStatus.PAUSED
|
|
assert result.output["message"] == "Review the spec."
|
|
assert result.output["options"] == ["approve", "reject"]
|
|
|
|
def test_validate_missing_message(self):
|
|
from specify_cli.workflows.steps.gate import GateStep
|
|
|
|
step = GateStep()
|
|
errors = step.validate({"id": "test", "options": ["approve"]})
|
|
assert any("missing 'message'" in e for e in errors)
|
|
|
|
def test_validate_invalid_on_reject(self):
|
|
from specify_cli.workflows.steps.gate import GateStep
|
|
|
|
step = GateStep()
|
|
errors = step.validate({
|
|
"id": "test",
|
|
"message": "Review",
|
|
"on_reject": "invalid",
|
|
})
|
|
assert any("on_reject" in e for e in errors)
|
|
|
|
|
|
class TestIfThenStep:
|
|
"""Test the if/then/else step type."""
|
|
|
|
def test_execute_then_branch(self):
|
|
from specify_cli.workflows.steps.if_then import IfThenStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = IfThenStep()
|
|
ctx = StepContext(inputs={"scope": "full"})
|
|
config = {
|
|
"id": "check",
|
|
"condition": "{{ inputs.scope == 'full' }}",
|
|
"then": [{"id": "a", "command": "speckit.tasks"}],
|
|
"else": [{"id": "b", "command": "speckit.plan"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["condition_result"] is True
|
|
assert len(result.next_steps) == 1
|
|
assert result.next_steps[0]["id"] == "a"
|
|
|
|
def test_execute_else_branch(self):
|
|
from specify_cli.workflows.steps.if_then import IfThenStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = IfThenStep()
|
|
ctx = StepContext(inputs={"scope": "backend"})
|
|
config = {
|
|
"id": "check",
|
|
"condition": "{{ inputs.scope == 'full' }}",
|
|
"then": [{"id": "a", "command": "speckit.tasks"}],
|
|
"else": [{"id": "b", "command": "speckit.plan"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["condition_result"] is False
|
|
assert result.next_steps[0]["id"] == "b"
|
|
|
|
def test_validate_missing_condition(self):
|
|
from specify_cli.workflows.steps.if_then import IfThenStep
|
|
|
|
step = IfThenStep()
|
|
errors = step.validate({"id": "test", "then": []})
|
|
assert any("missing 'condition'" in e for e in errors)
|
|
|
|
|
|
class TestSwitchStep:
|
|
"""Test the switch step type."""
|
|
|
|
def test_execute_matches_case(self):
|
|
from specify_cli.workflows.steps.switch import SwitchStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = SwitchStep()
|
|
ctx = StepContext(
|
|
steps={"review": {"output": {"choice": "approve"}}}
|
|
)
|
|
config = {
|
|
"id": "route",
|
|
"expression": "{{ steps.review.output.choice }}",
|
|
"cases": {
|
|
"approve": [{"id": "plan", "command": "speckit.plan"}],
|
|
"reject": [{"id": "log", "type": "shell", "run": "echo rejected"}],
|
|
},
|
|
"default": [{"id": "abort", "type": "gate", "message": "Unknown"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["matched_case"] == "approve"
|
|
assert result.next_steps[0]["id"] == "plan"
|
|
|
|
def test_execute_falls_to_default(self):
|
|
from specify_cli.workflows.steps.switch import SwitchStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = SwitchStep()
|
|
ctx = StepContext(
|
|
steps={"review": {"output": {"choice": "unknown"}}}
|
|
)
|
|
config = {
|
|
"id": "route",
|
|
"expression": "{{ steps.review.output.choice }}",
|
|
"cases": {
|
|
"approve": [{"id": "plan", "command": "speckit.plan"}],
|
|
},
|
|
"default": [{"id": "fallback", "type": "gate", "message": "Fallback"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["matched_case"] == "__default__"
|
|
assert result.next_steps[0]["id"] == "fallback"
|
|
|
|
def test_execute_no_default_no_match(self):
|
|
from specify_cli.workflows.steps.switch import SwitchStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = SwitchStep()
|
|
ctx = StepContext(
|
|
steps={"review": {"output": {"choice": "other"}}}
|
|
)
|
|
config = {
|
|
"id": "route",
|
|
"expression": "{{ steps.review.output.choice }}",
|
|
"cases": {
|
|
"approve": [{"id": "plan", "command": "speckit.plan"}],
|
|
},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["matched_case"] == "__default__"
|
|
assert result.next_steps == []
|
|
|
|
def test_validate_missing_expression(self):
|
|
from specify_cli.workflows.steps.switch import SwitchStep
|
|
|
|
step = SwitchStep()
|
|
errors = step.validate({"id": "test", "cases": {}})
|
|
assert any("missing 'expression'" in e for e in errors)
|
|
|
|
def test_validate_invalid_cases_and_default(self):
|
|
from specify_cli.workflows.steps.switch import SwitchStep
|
|
|
|
step = SwitchStep()
|
|
errors = step.validate({
|
|
"id": "test",
|
|
"expression": "{{ x }}",
|
|
"cases": {"a": "not-a-list"},
|
|
"default": "also-bad",
|
|
})
|
|
assert any("case 'a' must be a list" in e for e in errors)
|
|
assert any("'default' must be a list" in e for e in errors)
|
|
|
|
|
|
class TestWhileStep:
|
|
"""Test the while loop step type."""
|
|
|
|
def test_execute_condition_true(self):
|
|
from specify_cli.workflows.steps.while_loop import WhileStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = WhileStep()
|
|
ctx = StepContext(
|
|
steps={"run-tests": {"output": {"exit_code": 1}}}
|
|
)
|
|
config = {
|
|
"id": "retry",
|
|
"condition": "{{ steps.run-tests.output.exit_code != 0 }}",
|
|
"max_iterations": 5,
|
|
"steps": [{"id": "fix", "command": "speckit.implement"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["condition_result"] is True
|
|
assert len(result.next_steps) == 1
|
|
|
|
def test_execute_condition_false(self):
|
|
from specify_cli.workflows.steps.while_loop import WhileStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = WhileStep()
|
|
ctx = StepContext(
|
|
steps={"run-tests": {"output": {"exit_code": 0}}}
|
|
)
|
|
config = {
|
|
"id": "retry",
|
|
"condition": "{{ steps.run-tests.output.exit_code != 0 }}",
|
|
"max_iterations": 5,
|
|
"steps": [{"id": "fix", "command": "speckit.implement"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["condition_result"] is False
|
|
assert result.next_steps == []
|
|
|
|
def test_validate_missing_fields(self):
|
|
from specify_cli.workflows.steps.while_loop import WhileStep
|
|
|
|
step = WhileStep()
|
|
errors = step.validate({"id": "test", "steps": []})
|
|
assert any("missing 'condition'" in e for e in errors)
|
|
# max_iterations is optional (defaults to 10)
|
|
|
|
def test_validate_invalid_max_iterations(self):
|
|
from specify_cli.workflows.steps.while_loop import WhileStep
|
|
|
|
step = WhileStep()
|
|
errors = step.validate({"id": "test", "condition": "{{ true }}", "max_iterations": 0, "steps": []})
|
|
assert any("must be an integer >= 1" in e for e in errors)
|
|
|
|
|
|
class TestDoWhileStep:
|
|
"""Test the do-while loop step type."""
|
|
|
|
def test_execute_always_runs_once(self):
|
|
from specify_cli.workflows.steps.do_while import DoWhileStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = DoWhileStep()
|
|
ctx = StepContext()
|
|
config = {
|
|
"id": "cycle",
|
|
"condition": "{{ false }}",
|
|
"max_iterations": 3,
|
|
"steps": [{"id": "refine", "command": "speckit.specify"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert len(result.next_steps) == 1
|
|
assert result.output["loop_type"] == "do-while"
|
|
assert result.output["condition"] == "{{ false }}"
|
|
|
|
def test_execute_with_true_condition(self):
|
|
from specify_cli.workflows.steps.do_while import DoWhileStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = DoWhileStep()
|
|
ctx = StepContext()
|
|
config = {
|
|
"id": "cycle",
|
|
"condition": "{{ true }}",
|
|
"max_iterations": 5,
|
|
"steps": [{"id": "work", "command": "speckit.plan"}],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
# Body always executes on first call regardless of condition
|
|
assert len(result.next_steps) == 1
|
|
assert result.output["max_iterations"] == 5
|
|
|
|
def test_execute_empty_steps(self):
|
|
from specify_cli.workflows.steps.do_while import DoWhileStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = DoWhileStep()
|
|
ctx = StepContext()
|
|
config = {
|
|
"id": "empty",
|
|
"condition": "{{ false }}",
|
|
"max_iterations": 1,
|
|
"steps": [],
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.next_steps == []
|
|
assert result.status.value == "completed"
|
|
|
|
def test_validate_missing_fields(self):
|
|
from specify_cli.workflows.steps.do_while import DoWhileStep
|
|
|
|
step = DoWhileStep()
|
|
errors = step.validate({"id": "test", "steps": []})
|
|
assert any("missing 'condition'" in e for e in errors)
|
|
# max_iterations is optional (defaults to 10)
|
|
|
|
def test_validate_steps_not_list(self):
|
|
from specify_cli.workflows.steps.do_while import DoWhileStep
|
|
|
|
step = DoWhileStep()
|
|
errors = step.validate({
|
|
"id": "test",
|
|
"condition": "{{ true }}",
|
|
"max_iterations": 3,
|
|
"steps": "not-a-list",
|
|
})
|
|
assert any("'steps' must be a list" in e for e in errors)
|
|
|
|
|
|
class TestFanOutStep:
|
|
"""Test the fan-out step type."""
|
|
|
|
def test_execute_with_items(self):
|
|
from specify_cli.workflows.steps.fan_out import FanOutStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = FanOutStep()
|
|
ctx = StepContext(
|
|
steps={"tasks": {"output": {"task_list": [
|
|
{"file": "a.md"},
|
|
{"file": "b.md"},
|
|
]}}}
|
|
)
|
|
config = {
|
|
"id": "parallel",
|
|
"items": "{{ steps.tasks.output.task_list }}",
|
|
"max_concurrency": 3,
|
|
"step": {"id": "impl", "command": "speckit.implement"},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["item_count"] == 2
|
|
assert result.output["max_concurrency"] == 3
|
|
|
|
def test_execute_non_list_items_resolves_empty(self):
|
|
from specify_cli.workflows.steps.fan_out import FanOutStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = FanOutStep()
|
|
ctx = StepContext()
|
|
config = {
|
|
"id": "parallel",
|
|
"items": "{{ undefined_var }}",
|
|
"step": {"id": "impl", "command": "speckit.implement"},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["item_count"] == 0
|
|
assert result.output["items"] == []
|
|
|
|
def test_validate_missing_fields(self):
|
|
from specify_cli.workflows.steps.fan_out import FanOutStep
|
|
|
|
step = FanOutStep()
|
|
errors = step.validate({"id": "test"})
|
|
assert any("missing 'items'" in e for e in errors)
|
|
assert any("missing 'step'" in e for e in errors)
|
|
|
|
def test_validate_step_not_mapping(self):
|
|
from specify_cli.workflows.steps.fan_out import FanOutStep
|
|
|
|
step = FanOutStep()
|
|
errors = step.validate({
|
|
"id": "test",
|
|
"items": "{{ x }}",
|
|
"step": "not-a-dict",
|
|
})
|
|
assert any("'step' must be a mapping" in e for e in errors)
|
|
|
|
|
|
class TestFanInStep:
|
|
"""Test the fan-in step type."""
|
|
|
|
def test_execute_collects_results(self):
|
|
from specify_cli.workflows.steps.fan_in import FanInStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = FanInStep()
|
|
ctx = StepContext(
|
|
steps={
|
|
"parallel": {"output": {"item_count": 2, "status": "done"}}
|
|
}
|
|
)
|
|
config = {
|
|
"id": "collect",
|
|
"wait_for": ["parallel"],
|
|
"output": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert len(result.output["results"]) == 1
|
|
assert result.output["results"][0]["item_count"] == 2
|
|
|
|
def test_execute_multiple_wait_for(self):
|
|
from specify_cli.workflows.steps.fan_in import FanInStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = FanInStep()
|
|
ctx = StepContext(
|
|
steps={
|
|
"task-a": {"output": {"file": "a.md"}},
|
|
"task-b": {"output": {"file": "b.md"}},
|
|
}
|
|
)
|
|
config = {
|
|
"id": "collect",
|
|
"wait_for": ["task-a", "task-b"],
|
|
"output": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert len(result.output["results"]) == 2
|
|
assert result.output["results"][0]["file"] == "a.md"
|
|
assert result.output["results"][1]["file"] == "b.md"
|
|
|
|
def test_execute_missing_wait_for_step(self):
|
|
from specify_cli.workflows.steps.fan_in import FanInStep
|
|
from specify_cli.workflows.base import StepContext
|
|
|
|
step = FanInStep()
|
|
ctx = StepContext(steps={})
|
|
config = {
|
|
"id": "collect",
|
|
"wait_for": ["nonexistent"],
|
|
"output": {},
|
|
}
|
|
result = step.execute(config, ctx)
|
|
assert result.output["results"] == [{}]
|
|
|
|
def test_validate_empty_wait_for(self):
|
|
from specify_cli.workflows.steps.fan_in import FanInStep
|
|
|
|
step = FanInStep()
|
|
errors = step.validate({"id": "test", "wait_for": []})
|
|
assert any("non-empty list" in e for e in errors)
|
|
|
|
def test_validate_wait_for_not_list(self):
|
|
from specify_cli.workflows.steps.fan_in import FanInStep
|
|
|
|
step = FanInStep()
|
|
errors = step.validate({"id": "test", "wait_for": "not-a-list"})
|
|
assert any("non-empty list" in e for e in errors)
|
|
|
|
|
|
# ===== Workflow Definition Tests =====
|
|
|
|
class TestWorkflowDefinition:
|
|
"""Test WorkflowDefinition loading and parsing."""
|
|
|
|
def test_from_yaml(self, sample_workflow_file):
|
|
from specify_cli.workflows.engine import WorkflowDefinition
|
|
|
|
definition = WorkflowDefinition.from_yaml(sample_workflow_file)
|
|
assert definition.id == "test-workflow"
|
|
assert definition.name == "Test Workflow"
|
|
assert definition.version == "1.0.0"
|
|
assert len(definition.steps) == 2
|
|
|
|
def test_from_string(self, sample_workflow_yaml):
|
|
from specify_cli.workflows.engine import WorkflowDefinition
|
|
|
|
definition = WorkflowDefinition.from_string(sample_workflow_yaml)
|
|
assert definition.id == "test-workflow"
|
|
assert len(definition.inputs) == 2
|
|
|
|
def test_from_string_invalid(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition
|
|
|
|
with pytest.raises(ValueError, match="must be a mapping"):
|
|
WorkflowDefinition.from_string("- just a list")
|
|
|
|
def test_inputs_parsed(self, sample_workflow_yaml):
|
|
from specify_cli.workflows.engine import WorkflowDefinition
|
|
|
|
definition = WorkflowDefinition.from_string(sample_workflow_yaml)
|
|
assert "spec" in definition.inputs
|
|
assert definition.inputs["spec"]["required"] is True
|
|
assert definition.inputs["scope"]["default"] == "full"
|
|
|
|
|
|
# ===== Workflow Validation Tests =====
|
|
|
|
class TestWorkflowValidation:
|
|
"""Test workflow validation."""
|
|
|
|
def test_valid_workflow(self, sample_workflow_yaml):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string(sample_workflow_yaml)
|
|
errors = validate_workflow(definition)
|
|
assert errors == []
|
|
|
|
def test_missing_id(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("workflow.id" in e for e in errors)
|
|
|
|
def test_invalid_id_format(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "Invalid ID!"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("lowercase alphanumeric" in e for e in errors)
|
|
|
|
def test_no_steps(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "test"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps: []
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("no steps" in e.lower() for e in errors)
|
|
|
|
def test_duplicate_step_ids(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "test"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: same-id
|
|
command: speckit.specify
|
|
- id: same-id
|
|
command: speckit.plan
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("Duplicate" in e for e in errors)
|
|
|
|
def test_invalid_step_type(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "test"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: bad
|
|
type: nonexistent
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("invalid type" in e.lower() for e in errors)
|
|
|
|
def test_nested_step_validation(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "test"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: branch
|
|
type: if
|
|
condition: "{{ true }}"
|
|
then:
|
|
- id: nested-a
|
|
command: speckit.specify
|
|
else:
|
|
- id: nested-b
|
|
command: speckit.plan
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert errors == []
|
|
|
|
def test_invalid_input_type(self):
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
workflow:
|
|
id: "test"
|
|
name: "Test"
|
|
version: "1.0.0"
|
|
inputs:
|
|
bad:
|
|
type: array
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("invalid type" in e.lower() for e in errors)
|
|
|
|
|
|
# ===== Workflow Engine Tests =====
|
|
|
|
class TestWorkflowEngine:
|
|
"""Test WorkflowEngine execution."""
|
|
|
|
def test_load_from_file(self, sample_workflow_file, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine
|
|
|
|
engine = WorkflowEngine(project_dir)
|
|
definition = engine.load_workflow(str(sample_workflow_file))
|
|
assert definition.id == "test-workflow"
|
|
|
|
def test_load_from_installed_id(self, sample_workflow_file, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine
|
|
|
|
engine = WorkflowEngine(project_dir)
|
|
definition = engine.load_workflow("test-workflow")
|
|
assert definition.id == "test-workflow"
|
|
|
|
def test_load_not_found(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine
|
|
|
|
engine = WorkflowEngine(project_dir)
|
|
with pytest.raises(FileNotFoundError):
|
|
engine.load_workflow("nonexistent")
|
|
|
|
def test_execute_simple_workflow(self, project_dir):
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "simple"
|
|
name: "Simple"
|
|
version: "1.0.0"
|
|
integration: claude
|
|
inputs:
|
|
name:
|
|
type: string
|
|
default: "test"
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
input:
|
|
args: "{{ inputs.name }}"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None):
|
|
state = engine.execute(definition, {"name": "login"})
|
|
|
|
assert state.status == RunStatus.FAILED
|
|
assert "step-one" in state.step_results
|
|
assert state.step_results["step-one"]["output"]["command"] == "speckit.specify"
|
|
assert state.step_results["step-one"]["output"]["input"]["args"] == "login"
|
|
|
|
def test_execute_with_gate_pauses(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "gated"
|
|
name: "Gated"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: step-one
|
|
type: shell
|
|
run: "echo test"
|
|
- id: gate
|
|
type: gate
|
|
message: "Review?"
|
|
options: [approve, reject]
|
|
on_reject: abort
|
|
- id: step-two
|
|
type: shell
|
|
run: "echo done"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.PAUSED
|
|
assert "gate" in state.step_results
|
|
assert state.step_results["gate"]["status"] == "paused"
|
|
|
|
def test_execute_with_shell_step(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "shell-test"
|
|
name: "Shell Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: echo
|
|
type: shell
|
|
run: "echo workflow-output"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert "workflow-output" in state.step_results["echo"]["output"]["stdout"]
|
|
|
|
def test_execute_with_if_then(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "branching"
|
|
name: "Branching"
|
|
version: "1.0.0"
|
|
inputs:
|
|
scope:
|
|
type: string
|
|
default: "full"
|
|
steps:
|
|
- id: check
|
|
type: if
|
|
condition: "{{ inputs.scope == 'full' }}"
|
|
then:
|
|
- id: full-tasks
|
|
type: shell
|
|
run: "echo full"
|
|
else:
|
|
- id: partial-tasks
|
|
type: shell
|
|
run: "echo partial"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition, {"scope": "full"})
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert "full-tasks" in state.step_results
|
|
assert "partial-tasks" not in state.step_results
|
|
|
|
def test_execute_missing_required_input(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "needs-input"
|
|
name: "Needs Input"
|
|
version: "1.0.0"
|
|
inputs:
|
|
name:
|
|
type: string
|
|
required: true
|
|
steps:
|
|
- id: step-one
|
|
command: speckit.specify
|
|
input:
|
|
args: "{{ inputs.name }}"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
|
|
with pytest.raises(ValueError, match="Required input"):
|
|
engine.execute(definition, {})
|
|
|
|
def test_integration_auto_default_uses_project_integration(self, project_dir):
|
|
"""`integration: auto` should resolve to .specify/integration.json's integration."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
specify_dir = project_dir / ".specify"
|
|
specify_dir.mkdir(parents=True, exist_ok=True)
|
|
(specify_dir / "integration.json").write_text(
|
|
json.dumps({"integration": "opencode", "version": "0.7.4"}),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "auto-default"
|
|
name: "Auto Default"
|
|
version: "1.0.0"
|
|
inputs:
|
|
integration:
|
|
type: string
|
|
default: "auto"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
resolved = engine._resolve_inputs(definition, {})
|
|
assert resolved["integration"] == "opencode"
|
|
|
|
def test_integration_auto_default_falls_back_when_no_integration_json(self, project_dir):
|
|
"""`integration: auto` should keep the literal "auto" when project state is missing.
|
|
|
|
The engine itself must not invent an integration when
|
|
``.specify/integration.json`` is absent; any later validation or
|
|
command resolution will handle an unresolved ``"auto"`` value.
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "auto-fallback"
|
|
name: "Auto Fallback"
|
|
version: "1.0.0"
|
|
inputs:
|
|
integration:
|
|
type: string
|
|
default: "auto"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
resolved = engine._resolve_inputs(definition, {})
|
|
assert resolved["integration"] == "auto"
|
|
|
|
def test_integration_explicit_input_overrides_auto(self, project_dir):
|
|
"""An explicit --input integration=X must win over `auto` even when integration.json exists."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
specify_dir = project_dir / ".specify"
|
|
specify_dir.mkdir(parents=True, exist_ok=True)
|
|
(specify_dir / "integration.json").write_text(
|
|
json.dumps({"integration": "opencode"}),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "explicit-wins"
|
|
name: "Explicit Wins"
|
|
version: "1.0.0"
|
|
inputs:
|
|
integration:
|
|
type: string
|
|
default: "auto"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
resolved = engine._resolve_inputs(definition, {"integration": "claude"})
|
|
assert resolved["integration"] == "claude"
|
|
|
|
def test_integration_explicit_auto_resolves_like_default(self, project_dir):
|
|
"""Passing ``integration=auto`` explicitly must resolve the sentinel,
|
|
not pass it through as a literal — the workflow prompt advertises
|
|
``auto`` as a valid value, so the dispatch path must never see it.
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
specify_dir = project_dir / ".specify"
|
|
specify_dir.mkdir(parents=True, exist_ok=True)
|
|
(specify_dir / "integration.json").write_text(
|
|
json.dumps({"integration": "opencode"}),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "explicit-auto"
|
|
name: "Explicit Auto"
|
|
version: "1.0.0"
|
|
inputs:
|
|
integration:
|
|
type: string
|
|
default: "auto"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
resolved = engine._resolve_inputs(definition, {"integration": "auto"})
|
|
assert resolved["integration"] == "opencode"
|
|
|
|
def test_integration_auto_ignores_malformed_integration_json(self, project_dir):
|
|
"""A malformed integration.json must not crash — fall back to the literal default."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
specify_dir = project_dir / ".specify"
|
|
specify_dir.mkdir(parents=True, exist_ok=True)
|
|
(specify_dir / "integration.json").write_text("{not json", encoding="utf-8")
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "auto-malformed"
|
|
name: "Auto Malformed"
|
|
version: "1.0.0"
|
|
inputs:
|
|
integration:
|
|
type: string
|
|
default: "auto"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
resolved = engine._resolve_inputs(definition, {})
|
|
assert resolved["integration"] == "auto"
|
|
|
|
def test_integration_auto_ignores_non_utf8_integration_json(self, project_dir):
|
|
"""A non-UTF8 integration.json must not crash — fall back to the literal default."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
specify_dir = project_dir / ".specify"
|
|
specify_dir.mkdir(parents=True, exist_ok=True)
|
|
# 0xFF is invalid as the leading byte of a UTF-8 sequence, so
|
|
# ``Path.read_text(encoding="utf-8")`` raises UnicodeDecodeError.
|
|
(specify_dir / "integration.json").write_bytes(b"\xff\xfe\x00\x00")
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "auto-non-utf8"
|
|
name: "Auto Non UTF-8"
|
|
version: "1.0.0"
|
|
inputs:
|
|
integration:
|
|
type: string
|
|
default: "auto"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
resolved = engine._resolve_inputs(definition, {})
|
|
assert resolved["integration"] == "auto"
|
|
|
|
def test_integration_auto_resolves_modern_normalized_state(self, project_dir):
|
|
"""`integration: auto` must resolve modern state files that record
|
|
``default_integration`` / ``installed_integrations`` and omit the
|
|
legacy ``integration`` field."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
specify_dir = project_dir / ".specify"
|
|
specify_dir.mkdir(parents=True, exist_ok=True)
|
|
(specify_dir / "integration.json").write_text(
|
|
json.dumps(
|
|
{
|
|
"version": "0.8.3",
|
|
"integration_state_schema": 1,
|
|
"default_integration": "claude",
|
|
"installed_integrations": ["claude", "copilot"],
|
|
"integration_settings": {},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "auto-modern"
|
|
name: "Auto Modern"
|
|
version: "1.0.0"
|
|
inputs:
|
|
integration:
|
|
type: string
|
|
default: "auto"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
resolved = engine._resolve_inputs(definition, {})
|
|
assert resolved["integration"] == "claude"
|
|
|
|
def test_integration_auto_rejects_future_state_schema(self, project_dir):
|
|
"""`integration: auto` must not silently use a state file written by a newer
|
|
CLI (``integration_state_schema`` greater than the current supported value);
|
|
the resolver falls back to the literal default rather than guessing."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.integration_state import INTEGRATION_STATE_SCHEMA
|
|
|
|
specify_dir = project_dir / ".specify"
|
|
specify_dir.mkdir(parents=True, exist_ok=True)
|
|
(specify_dir / "integration.json").write_text(
|
|
json.dumps(
|
|
{
|
|
"version": "99.0.0",
|
|
"integration_state_schema": INTEGRATION_STATE_SCHEMA + 1,
|
|
"default_integration": "claude",
|
|
"installed_integrations": ["claude"],
|
|
"integration_settings": {},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "auto-future-schema"
|
|
name: "Auto Future Schema"
|
|
version: "1.0.0"
|
|
inputs:
|
|
integration:
|
|
type: string
|
|
default: "auto"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
resolved = engine._resolve_inputs(definition, {})
|
|
assert resolved["integration"] == "auto"
|
|
|
|
def test_default_value_is_validated_against_enum(self, project_dir):
|
|
"""Defaults must run through the same coercion/enum check as provided inputs."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "default-enum"
|
|
name: "Default Enum"
|
|
version: "1.0.0"
|
|
inputs:
|
|
scope:
|
|
type: string
|
|
default: "not-in-enum"
|
|
enum: ["full", "backend-only", "frontend-only"]
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
with pytest.raises(ValueError, match="not in allowed values"):
|
|
engine._resolve_inputs(definition, {})
|
|
|
|
def test_default_value_is_coerced_to_declared_type(self, project_dir):
|
|
"""A numeric default declared as a string should still be coerced like a provided input."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "default-coerce"
|
|
name: "Default Coerce"
|
|
version: "1.0.0"
|
|
inputs:
|
|
retries:
|
|
type: number
|
|
default: "3"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
resolved = engine._resolve_inputs(definition, {})
|
|
assert resolved["retries"] == 3
|
|
assert isinstance(resolved["retries"], int)
|
|
|
|
def test_validate_workflow_rejects_invalid_default(self):
|
|
"""Authoring-time validation should reject defaults that violate enum."""
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "bad-default"
|
|
name: "Bad Default"
|
|
version: "1.0.0"
|
|
inputs:
|
|
scope:
|
|
type: string
|
|
default: "not-in-enum"
|
|
enum: ["full", "backend-only", "frontend-only"]
|
|
steps:
|
|
- id: noop
|
|
type: gate
|
|
message: "noop"
|
|
options: [approve]
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("invalid default" in e for e in errors), errors
|
|
|
|
def test_validate_workflow_exempts_integration_auto_sentinel(self):
|
|
"""``integration: auto`` is a runtime-resolved sentinel and must not fail validation."""
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "auto-ok"
|
|
name: "Auto OK"
|
|
version: "1.0.0"
|
|
inputs:
|
|
integration:
|
|
type: string
|
|
default: "auto"
|
|
enum: ["copilot", "claude", "gemini"]
|
|
steps:
|
|
- id: noop
|
|
type: gate
|
|
message: "noop"
|
|
options: [approve]
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert not any("invalid default" in e for e in errors), errors
|
|
|
|
def test_validate_workflow_still_checks_type_for_auto_sentinel(self):
|
|
"""The ``auto`` exemption only skips enum-membership; declared type is still enforced."""
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "auto-bad-type"
|
|
name: "Auto Bad Type"
|
|
version: "1.0.0"
|
|
inputs:
|
|
integration:
|
|
type: number
|
|
default: "auto"
|
|
steps:
|
|
- id: noop
|
|
type: gate
|
|
message: "noop"
|
|
options: [approve]
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("invalid default" in e for e in errors), errors
|
|
|
|
def test_validate_workflow_rejects_bool_default_for_number_type(self):
|
|
"""``type: number`` paired with a bool default must fail — bool is a
|
|
subclass of int so ``float(True)`` would otherwise silently coerce
|
|
``true`` to ``1``.
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "bool-as-number"
|
|
name: "Bool As Number"
|
|
version: "1.0.0"
|
|
inputs:
|
|
count:
|
|
type: number
|
|
default: true
|
|
steps:
|
|
- id: noop
|
|
type: gate
|
|
message: "noop"
|
|
options: [approve]
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("invalid default" in e for e in errors), errors
|
|
|
|
def test_validate_workflow_rejects_non_string_default_for_string_type(self):
|
|
"""``type: string`` must require an actual string — a numeric YAML
|
|
default like ``5`` would otherwise slip through unvalidated.
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "number-as-string"
|
|
name: "Number As String"
|
|
version: "1.0.0"
|
|
inputs:
|
|
label:
|
|
type: string
|
|
default: 5
|
|
steps:
|
|
- id: noop
|
|
type: gate
|
|
message: "noop"
|
|
options: [approve]
|
|
""")
|
|
errors = validate_workflow(definition)
|
|
assert any("invalid default" in e for e in errors), errors
|
|
|
|
def test_while_loop_condition_reads_latest_iteration(self, project_dir):
|
|
"""Regression: while-loop condition must see updated step output
|
|
from the most recent iteration, not stale iteration-0 data.
|
|
|
|
See https://github.com/github/spec-kit/issues/2592
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
# Shell step echoes a counter via a file.
|
|
# Condition: exit_code != 0 means "keep looping" — but a non-zero
|
|
# exit code would mark the step FAILED and abort the run, so we
|
|
# use stdout-based comparison instead.
|
|
#
|
|
# Iteration 0: counter=1, echoes "1" → not "done" → loop continues
|
|
# Iteration 1: counter=2, echoes "done" → condition false → stop
|
|
# Without the fix, condition always reads iteration-0 stdout,
|
|
# so the loop runs all max_iterations.
|
|
import sys
|
|
|
|
counter_file = project_dir / ".counter"
|
|
counter_file.write_text("0", encoding="utf-8")
|
|
py = sys.executable
|
|
script_file = project_dir / "_tick.py"
|
|
script_file.write_text(
|
|
f"import pathlib; p = pathlib.Path(r'{counter_file}')\n"
|
|
"n = int(p.read_text()) + 1; p.write_text(str(n))\n"
|
|
"print('done' if n >= 2 else str(n), end='')\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
yaml_str = f"""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "while-condition-update"
|
|
name: "While Condition Update"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: retry-loop
|
|
type: while
|
|
condition: "{{{{ 'done' not in steps.attempt.output.stdout }}}}"
|
|
max_iterations: 5
|
|
steps:
|
|
- id: attempt
|
|
type: shell
|
|
run: '"{py}" "{script_file}"'
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
# The unprefixed key should reflect the latest iteration's result.
|
|
assert state.step_results["attempt"]["output"]["stdout"] == "done"
|
|
# Namespaced iteration-1 result should also exist.
|
|
assert "retry-loop:attempt:1" in state.step_results
|
|
# Counter should be 2 (iteration 0 + iteration 1), not 5.
|
|
assert counter_file.read_text(encoding="utf-8").strip() == "2"
|
|
|
|
def test_do_while_loop_condition_reads_latest_iteration(self, project_dir):
|
|
"""Regression: do-while loop condition must also see updated output.
|
|
|
|
See https://github.com/github/spec-kit/issues/2592
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
import sys
|
|
|
|
counter_file = project_dir / ".counter"
|
|
counter_file.write_text("0", encoding="utf-8")
|
|
py = sys.executable
|
|
script_file = project_dir / "_tick.py"
|
|
script_file.write_text(
|
|
f"import pathlib; p = pathlib.Path(r'{counter_file}')\n"
|
|
"n = int(p.read_text()) + 1; p.write_text(str(n))\n"
|
|
"print('done' if n >= 2 else str(n), end='')\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
yaml_str = f"""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "do-while-condition-update"
|
|
name: "Do While Condition Update"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: retry-loop
|
|
type: do-while
|
|
condition: "{{{{ 'done' not in steps.attempt.output.stdout }}}}"
|
|
max_iterations: 5
|
|
steps:
|
|
- id: attempt
|
|
type: shell
|
|
run: '"{py}" "{script_file}"'
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert state.step_results["attempt"]["output"]["stdout"] == "done"
|
|
assert counter_file.read_text(encoding="utf-8").strip() == "2"
|
|
|
|
def test_while_loop_runs_to_max_when_condition_stays_true(self, project_dir):
|
|
"""While loop must still run to max_iterations when the condition
|
|
never becomes false — copy-back must not break this path.
|
|
|
|
See https://github.com/github/spec-kit/issues/2592
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
import sys
|
|
|
|
counter_file = project_dir / ".counter"
|
|
counter_file.write_text("0", encoding="utf-8")
|
|
py = sys.executable
|
|
script_file = project_dir / "_tick.py"
|
|
script_file.write_text(
|
|
f"import pathlib; p = pathlib.Path(r'{counter_file}')\n"
|
|
"n = int(p.read_text()) + 1; p.write_text(str(n))\n"
|
|
"print('pending', end='')\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
yaml_str = f"""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "while-max-iterations"
|
|
name: "While Max Iterations"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: retry-loop
|
|
type: while
|
|
condition: "{{{{ 'done' not in steps.tick.output.stdout }}}}"
|
|
max_iterations: 3
|
|
steps:
|
|
- id: tick
|
|
type: shell
|
|
run: '"{py}" "{script_file}"'
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
# All 3 iterations ran (iteration 0 + 2 loop iterations).
|
|
assert counter_file.read_text(encoding="utf-8").strip() == "3"
|
|
# Unprefixed key holds the last iteration's result.
|
|
assert state.step_results["tick"]["output"]["stdout"] == "pending"
|
|
# Namespaced keys for loop iterations exist.
|
|
assert "retry-loop:tick:1" in state.step_results
|
|
assert "retry-loop:tick:2" in state.step_results
|
|
|
|
def test_do_while_loop_runs_to_max_when_condition_stays_true(self, project_dir):
|
|
"""Do-while loop must still run to max_iterations when the condition
|
|
never becomes false.
|
|
|
|
See https://github.com/github/spec-kit/issues/2592
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
import sys
|
|
|
|
counter_file = project_dir / ".counter"
|
|
counter_file.write_text("0", encoding="utf-8")
|
|
py = sys.executable
|
|
script_file = project_dir / "_tick.py"
|
|
script_file.write_text(
|
|
f"import pathlib; p = pathlib.Path(r'{counter_file}')\n"
|
|
"n = int(p.read_text()) + 1; p.write_text(str(n))\n"
|
|
"print('pending', end='')\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
yaml_str = f"""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "do-while-max-iterations"
|
|
name: "Do While Max Iterations"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: retry-loop
|
|
type: do-while
|
|
condition: "{{{{ 'done' not in steps.tick.output.stdout }}}}"
|
|
max_iterations: 3
|
|
steps:
|
|
- id: tick
|
|
type: shell
|
|
run: '"{py}" "{script_file}"'
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert counter_file.read_text(encoding="utf-8").strip() == "3"
|
|
assert state.step_results["tick"]["output"]["stdout"] == "pending"
|
|
|
|
def test_while_loop_multi_step_body_inter_step_refs(self, project_dir):
|
|
"""Multi-step loop body: step B must see step A's output from the
|
|
current iteration, not a stale previous one.
|
|
|
|
See https://github.com/github/spec-kit/issues/2592
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
import sys
|
|
|
|
counter_file = project_dir / ".counter"
|
|
counter_file.write_text("0", encoding="utf-8")
|
|
py = sys.executable
|
|
|
|
# Step A: increments counter file, echoes the value.
|
|
step_a_file = project_dir / "_step_a.py"
|
|
step_a_file.write_text(
|
|
f"import pathlib; p = pathlib.Path(r'{counter_file}')\n"
|
|
"n = int(p.read_text()) + 1; p.write_text(str(n))\n"
|
|
"print(str(n), end='')\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
# Step B uses {{ steps.step-a.output.stdout }} expression
|
|
# substitution in its run command so the engine resolves the
|
|
# aliased unprefixed key — this is the real inter-step test.
|
|
yaml_str = f"""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "while-multi-step"
|
|
name: "While Multi Step"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: retry-loop
|
|
type: while
|
|
condition: "{{{{ 'done' not in steps.step-a.output.stdout }}}}"
|
|
max_iterations: 3
|
|
steps:
|
|
- id: step-a
|
|
type: shell
|
|
run: '"{py}" "{step_a_file}"'
|
|
- id: step-b
|
|
type: shell
|
|
run: "echo b-saw-{{{{ steps.step-a.output.stdout }}}}"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
# Both unprefixed keys reflect the latest iteration's results.
|
|
assert state.step_results["step-a"]["output"]["stdout"] == "3"
|
|
# Step B saw step A's output via expression substitution.
|
|
assert "b-saw-3" in state.step_results["step-b"]["output"]["stdout"]
|
|
# Namespaced keys exist for loop iterations.
|
|
assert "retry-loop:step-a:1" in state.step_results
|
|
assert "retry-loop:step-b:1" in state.step_results
|
|
assert "retry-loop:step-a:2" in state.step_results
|
|
assert "retry-loop:step-b:2" in state.step_results
|
|
|
|
|
|
# ===== context.run_id Tests =====
|
|
#
|
|
# End-to-end coverage for the `{{ context.run_id }}` template
|
|
# variable introduced in issue #2590. Locks resolution inside the
|
|
# three step types the acceptance criteria called out — shell `run:`,
|
|
# command `input.args:`, and switch `expression:` — plus the
|
|
# "workflow doesn't reference it" backward-compat path.
|
|
|
|
|
|
class TestContextRunId:
|
|
"""End-to-end tests for `{{ context.run_id }}` in workflow YAML."""
|
|
|
|
def test_shell_run_resolves_run_id(self, project_dir):
|
|
"""`run: "echo {{ context.run_id }}"` substitutes the
|
|
engine-assigned run id into the spawned shell, and the
|
|
same value appears on `state.run_id`.
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "stamp-run-id"
|
|
name: "Stamp Run Id"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: stamp
|
|
type: shell
|
|
run: "echo RUN_ID={{ context.run_id }}"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition, run_id="abc12345")
|
|
|
|
assert state.run_id == "abc12345"
|
|
stdout = state.step_results["stamp"]["output"]["stdout"]
|
|
assert stdout.strip() == "RUN_ID=abc12345"
|
|
|
|
def test_command_input_args_resolves_run_id(self, project_dir):
|
|
"""`input.args: "{{ context.run_id }}"` is resolved by
|
|
`CommandStep` and recorded in step output, even when CLI
|
|
dispatch is unavailable (no integration installed). Covers
|
|
the artifact-metadata use case from the issue.
|
|
"""
|
|
from unittest.mock import patch
|
|
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "command-stamp"
|
|
name: "Command Stamp"
|
|
version: "1.0.0"
|
|
integration: claude
|
|
steps:
|
|
- id: tag-artifact
|
|
command: speckit.specify
|
|
input:
|
|
args: "{{ context.run_id }}"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
with patch(
|
|
"specify_cli.workflows.steps.command.shutil.which",
|
|
return_value=None,
|
|
):
|
|
state = engine.execute(definition, run_id="cafef00d")
|
|
|
|
# Even when dispatch fails (no CLI), the resolved input is
|
|
# recorded so downstream observers see the run id in artifact
|
|
# metadata.
|
|
assert state.step_results["tag-artifact"]["output"]["input"]["args"] == "cafef00d"
|
|
|
|
def test_switch_expression_matches_on_run_id(self, project_dir):
|
|
"""`switch` over `{{ context.run_id }}` matches against case
|
|
keys, and the nested branch can ALSO reference
|
|
`{{ context.run_id }}`. Demonstrates the run id is a
|
|
first-class value in the expression engine (not just a
|
|
string-interpolation token) AND that it propagates into
|
|
nested step execution via the recursive `_execute_steps`
|
|
traversal.
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "switch-on-run-id"
|
|
name: "Switch On Run Id"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: route
|
|
type: switch
|
|
expression: "{{ context.run_id }}"
|
|
cases:
|
|
target-run:
|
|
- id: matched-branch
|
|
type: shell
|
|
run: "echo nested-run-id={{ context.run_id }}"
|
|
default:
|
|
- id: default-branch
|
|
type: shell
|
|
run: "echo defaulted"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition, run_id="target-run")
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert state.step_results["route"]["output"]["matched_case"] == "target-run"
|
|
assert "matched-branch" in state.step_results
|
|
assert "default-branch" not in state.step_results
|
|
# The nested branch sees the same run id — propagation through
|
|
# recursive `_execute_steps` is intact.
|
|
nested_stdout = state.step_results["matched-branch"]["output"]["stdout"]
|
|
assert nested_stdout.strip() == "nested-run-id=target-run"
|
|
|
|
def test_workflow_without_context_reference_unchanged(self, project_dir):
|
|
"""Workflows that do not reference `{{ context.run_id }}`
|
|
continue to run exactly as before. Locks the byte-equivalent
|
|
default required by the issue's acceptance criteria.
|
|
"""
|
|
from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
definition = WorkflowDefinition.from_string("""
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "no-context-ref"
|
|
name: "No Context Ref"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: only-step
|
|
type: shell
|
|
run: "echo hello"
|
|
""")
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert state.step_results["only-step"]["output"]["stdout"].strip() == "hello"
|
|
|
|
|
|
# ===== State Persistence Tests =====
|
|
|
|
class TestRunState:
|
|
"""Test RunState persistence and loading."""
|
|
|
|
def test_save_and_load(self, project_dir):
|
|
from specify_cli.workflows.engine import RunState
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
state = RunState(
|
|
run_id="test-run",
|
|
workflow_id="test-workflow",
|
|
project_root=project_dir,
|
|
)
|
|
state.status = RunStatus.RUNNING
|
|
state.inputs = {"name": "login"}
|
|
state.step_results = {
|
|
"step-one": {
|
|
"output": {"file": "spec.md"},
|
|
"status": "completed",
|
|
}
|
|
}
|
|
state.save()
|
|
|
|
loaded = RunState.load("test-run", project_dir)
|
|
assert loaded.run_id == "test-run"
|
|
assert loaded.workflow_id == "test-workflow"
|
|
assert loaded.status == RunStatus.RUNNING
|
|
assert loaded.inputs == {"name": "login"}
|
|
assert "step-one" in loaded.step_results
|
|
|
|
def test_load_not_found(self, project_dir):
|
|
from specify_cli.workflows.engine import RunState
|
|
|
|
with pytest.raises(FileNotFoundError):
|
|
RunState.load("nonexistent", project_dir)
|
|
|
|
def test_append_log(self, project_dir):
|
|
from specify_cli.workflows.engine import RunState
|
|
|
|
state = RunState(
|
|
run_id="log-test",
|
|
workflow_id="test",
|
|
project_root=project_dir,
|
|
)
|
|
state.append_log({"event": "test_event", "data": "hello"})
|
|
|
|
log_file = state.runs_dir / "log.jsonl"
|
|
assert log_file.exists()
|
|
lines = log_file.read_text().strip().split("\n")
|
|
entry = json.loads(lines[0])
|
|
assert entry["event"] == "test_event"
|
|
assert "timestamp" in entry
|
|
|
|
|
|
class TestListRuns:
|
|
"""Test listing workflow runs."""
|
|
|
|
def test_list_empty(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine
|
|
|
|
engine = WorkflowEngine(project_dir)
|
|
assert engine.list_runs() == []
|
|
|
|
def test_list_after_execution(self, project_dir):
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "list-test"
|
|
name: "List Test"
|
|
version: "1.0.0"
|
|
steps:
|
|
- id: step-one
|
|
type: shell
|
|
run: "echo test"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
engine.execute(definition)
|
|
|
|
runs = engine.list_runs()
|
|
assert len(runs) == 1
|
|
assert runs[0]["workflow_id"] == "list-test"
|
|
|
|
|
|
# ===== Workflow Registry Tests =====
|
|
|
|
class TestWorkflowRegistry:
|
|
"""Test WorkflowRegistry operations."""
|
|
|
|
def test_add_and_get(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowRegistry
|
|
|
|
registry = WorkflowRegistry(project_dir)
|
|
registry.add("test-wf", {"name": "Test", "version": "1.0.0"})
|
|
|
|
entry = registry.get("test-wf")
|
|
assert entry is not None
|
|
assert entry["name"] == "Test"
|
|
assert "installed_at" in entry
|
|
|
|
def test_remove(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowRegistry
|
|
|
|
registry = WorkflowRegistry(project_dir)
|
|
registry.add("test-wf", {"name": "Test"})
|
|
assert registry.is_installed("test-wf")
|
|
|
|
registry.remove("test-wf")
|
|
assert not registry.is_installed("test-wf")
|
|
|
|
def test_list(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowRegistry
|
|
|
|
registry = WorkflowRegistry(project_dir)
|
|
registry.add("wf-a", {"name": "A"})
|
|
registry.add("wf-b", {"name": "B"})
|
|
|
|
installed = registry.list()
|
|
assert "wf-a" in installed
|
|
assert "wf-b" in installed
|
|
|
|
def test_is_installed(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowRegistry
|
|
|
|
registry = WorkflowRegistry(project_dir)
|
|
assert not registry.is_installed("missing")
|
|
|
|
registry.add("exists", {"name": "Exists"})
|
|
assert registry.is_installed("exists")
|
|
|
|
def test_persistence(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowRegistry
|
|
|
|
registry1 = WorkflowRegistry(project_dir)
|
|
registry1.add("test-wf", {"name": "Test"})
|
|
|
|
# Load fresh
|
|
registry2 = WorkflowRegistry(project_dir)
|
|
assert registry2.is_installed("test-wf")
|
|
|
|
|
|
# ===== Workflow Catalog Tests =====
|
|
|
|
class TestWorkflowCatalog:
|
|
"""Test WorkflowCatalog catalog resolution."""
|
|
|
|
def test_default_catalogs(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
entries = catalog.get_active_catalogs()
|
|
assert len(entries) == 2
|
|
assert entries[0].name == "default"
|
|
assert entries[1].name == "community"
|
|
|
|
def test_env_var_override(self, project_dir, monkeypatch):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
monkeypatch.setenv("SPECKIT_WORKFLOW_CATALOG_URL", "https://example.com/catalog.json")
|
|
catalog = WorkflowCatalog(project_dir)
|
|
entries = catalog.get_active_catalogs()
|
|
assert len(entries) == 1
|
|
assert entries[0].name == "env-override"
|
|
assert entries[0].url == "https://example.com/catalog.json"
|
|
|
|
def test_project_level_config(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
config_path = project_dir / ".specify" / "workflow-catalogs.yml"
|
|
config_path.write_text(yaml.dump({
|
|
"catalogs": [{
|
|
"name": "custom",
|
|
"url": "https://example.com/wf-catalog.json",
|
|
"priority": 1,
|
|
"install_allowed": True,
|
|
}]
|
|
}))
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
entries = catalog.get_active_catalogs()
|
|
assert len(entries) == 1
|
|
assert entries[0].name == "custom"
|
|
|
|
def test_validate_url_http_rejected(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
with pytest.raises(WorkflowValidationError, match="HTTPS"):
|
|
catalog._validate_catalog_url("http://evil.com/catalog.json")
|
|
|
|
def test_validate_url_localhost_http_allowed(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
# Should not raise
|
|
catalog._validate_catalog_url("http://localhost:8080/catalog.json")
|
|
|
|
def test_add_catalog(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
catalog.add_catalog("https://example.com/new-catalog.json", "my-catalog")
|
|
|
|
config_path = project_dir / ".specify" / "workflow-catalogs.yml"
|
|
assert config_path.exists()
|
|
data = yaml.safe_load(config_path.read_text())
|
|
assert len(data["catalogs"]) == 1
|
|
assert data["catalogs"][0]["url"] == "https://example.com/new-catalog.json"
|
|
|
|
def test_add_catalog_duplicate_rejected(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
catalog.add_catalog("https://example.com/catalog.json")
|
|
|
|
with pytest.raises(WorkflowValidationError, match="already configured"):
|
|
catalog.add_catalog("https://example.com/catalog.json")
|
|
|
|
def test_remove_catalog(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
catalog.add_catalog("https://example.com/c1.json", "first")
|
|
catalog.add_catalog("https://example.com/c2.json", "second")
|
|
|
|
removed = catalog.remove_catalog(0)
|
|
assert removed == "first"
|
|
|
|
config_path = project_dir / ".specify" / "workflow-catalogs.yml"
|
|
data = yaml.safe_load(config_path.read_text())
|
|
assert len(data["catalogs"]) == 1
|
|
|
|
def test_remove_catalog_invalid_index(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
catalog.add_catalog("https://example.com/c1.json")
|
|
|
|
with pytest.raises(WorkflowValidationError, match="out of range"):
|
|
catalog.remove_catalog(5)
|
|
|
|
def test_get_catalog_configs(self, project_dir):
|
|
from specify_cli.workflows.catalog import WorkflowCatalog
|
|
|
|
catalog = WorkflowCatalog(project_dir)
|
|
configs = catalog.get_catalog_configs()
|
|
assert len(configs) == 2
|
|
assert configs[0]["name"] == "default"
|
|
assert isinstance(configs[0]["install_allowed"], bool)
|
|
|
|
|
|
# ===== Integration Test =====
|
|
|
|
class TestWorkflowIntegration:
|
|
"""End-to-end workflow execution tests."""
|
|
|
|
def test_full_sequential_workflow(self, project_dir):
|
|
"""Execute a multi-step sequential workflow end to end."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "e2e-test"
|
|
name: "E2E Test"
|
|
version: "1.0.0"
|
|
integration: claude
|
|
inputs:
|
|
feature:
|
|
type: string
|
|
default: "login"
|
|
steps:
|
|
- id: specify
|
|
type: shell
|
|
run: "echo speckit.specify {{ inputs.feature }}"
|
|
|
|
- id: check-scope
|
|
type: if
|
|
condition: "{{ inputs.feature == 'login' }}"
|
|
then:
|
|
- id: echo-full
|
|
type: shell
|
|
run: "echo full scope"
|
|
else:
|
|
- id: echo-partial
|
|
type: shell
|
|
run: "echo partial scope"
|
|
|
|
- id: plan
|
|
type: shell
|
|
run: "echo speckit.plan"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert "specify" in state.step_results
|
|
assert "check-scope" in state.step_results
|
|
assert "echo-full" in state.step_results
|
|
assert "echo-partial" not in state.step_results
|
|
assert "plan" in state.step_results
|
|
|
|
def test_switch_workflow(self, project_dir):
|
|
"""Test switch step type in a workflow."""
|
|
from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition
|
|
from specify_cli.workflows.base import RunStatus
|
|
|
|
yaml_str = """
|
|
schema_version: "1.0"
|
|
workflow:
|
|
id: "switch-test"
|
|
name: "Switch Test"
|
|
version: "1.0.0"
|
|
inputs:
|
|
action:
|
|
type: string
|
|
default: "plan"
|
|
steps:
|
|
- id: route
|
|
type: switch
|
|
expression: "{{ inputs.action }}"
|
|
cases:
|
|
specify:
|
|
- id: do-specify
|
|
type: shell
|
|
run: "echo specify"
|
|
plan:
|
|
- id: do-plan
|
|
type: shell
|
|
run: "echo plan"
|
|
default:
|
|
- id: do-default
|
|
type: shell
|
|
run: "echo default"
|
|
"""
|
|
definition = WorkflowDefinition.from_string(yaml_str)
|
|
engine = WorkflowEngine(project_dir)
|
|
state = engine.execute(definition)
|
|
|
|
assert state.status == RunStatus.COMPLETED
|
|
assert "do-plan" in state.step_results
|
|
assert "do-specify" not in state.step_results
|