refactor: move workflow command handlers to workflows/_commands.py (PR-8/8) (#3159)

* refactor: move workflow command handlers to workflows/_commands.py (PR-8/8)

Final PR of the __init__.py split. Moves the workflow command group out of
__init__.py into the existing workflows/ package, completing the domain-dir
layout established in PR-5 (integrations), PR-6 (presets) and PR-7
(extensions).

- New workflows/_commands.py holds the four Typer apps (workflow / catalog /
  step / step-catalog), all 25 command handlers, the six workflow-only
  helpers (_parse_input_values, _workflow_run_payload, _emit_workflow_json,
  _stdout_to_stderr_when, _validate_step_id_or_exit,
  _resolve_steps_base_dir_or_exit), and a register(app) entry point.
- workflows is already a package, so no rename is needed; intra-package
  imports change from `.workflows.x` to `.x`. The only root-helper dep
  (_require_specify_project) is reached through a call-time shim so test
  monkeypatching of specify_cli._require_specify_project keeps working.
- __init__.py drops ~1445 lines (2066 -> 621); the workflow group is
  re-attached via register(app). Dead `contextlib` import removed.
- tests/test_workflows.py: import the now-relocated _stdout_to_stderr_when
  helper from its new home (workflows._commands) instead of the package root.

No behavior change. Full suite green (3847 passed), ruff clean.

* Prevent workflow state writes through symlinked storage

Workflow commands persist run state under .specify/workflows/runs, so the command-local project shim now rejects symlinked workflow storage before any workflow command proceeds. The standalone YAML path uses the same guard because it intentionally bypasses the normal project requirement while still creating workflow state under the current directory.

Constraint: Local YAML workflow runs do not require an existing .specify project directory but still create .specify/workflows/runs state

Rejected: Guard only .specify in the file-source path | .specify/workflows and runs can independently redirect writes

Confidence: high

Scope-risk: narrow

Directive: Keep workflow storage symlink checks centralized before constructing WorkflowEngine

Tested: .venv/bin/python -m pytest tests/test_workflow_run_without_project.py tests/test_workflows.py::TestWorkflowAddSymlinkGuard -v

Tested: .venv/bin/python -m py_compile src/specify_cli/workflows/_commands.py tests/test_workflow_run_without_project.py tests/test_workflows.py

Not-tested: Ruff lint; ruff is not installed in the repo virtualenv

Assisted-by: OpenAI Codex (model: GPT-5, autonomous)

* fix(workflows): pass github_hosts allowlist to GHES release asset resolver

workflow add resolved GitHub release download URLs without forwarding the
github_provider_hosts() allowlist, so resolve_github_release_asset_api_url
never treated any host as GHES. This regressed GitHub Enterprise Server
release asset resolution and diverged from presets/extensions, which already
pass github_hosts. Forward github_provider_hosts() at both the direct-URL and
catalog install call sites. The allowlist remains the anti-SSRF gate.

* fix(workflows): reject symlinked/traversal <id> dir on workflow install

Local/URL and catalog installs wrote to .specify/workflows/<id>/workflow.yml
without guarding the <id> segment. A pre-planted symlink at <id> or
<id>/workflow.yml let mkdir+copy/download follow it and write outside the
project root; a non-directory <id> made mkdir raise unhandled.

Add _safe_workflow_id_dir() to reject path traversal, symlinked or
non-directory <id>, and a symlinked workflow.yml leaf before any write.
Fold the catalog branch's existing traversal check into the helper.

* fix(workflows): harden _safe_workflow_id_dir output and leaf checks

- Reorder symlink/non-directory check before resolve() so a symlinked
  <id> reports as symlinked instead of misleading "Invalid workflow ID"
- Reject a pre-existing <id>/workflow.yml that is not a file, avoiding an
  unhandled IsADirectoryError on later write/copy2
- Escape workflow_id in Rich output to prevent markup injection; escape
  the repr (not the raw id) so repr-added backslashes cannot re-expose
  brackets, matching extensions/_commands.py hardening
- Add tests for workflow.yml-as-directory and markup-escaped invalid id

* Avoid stale lint failures from config helper imports

Move PyYAML loading into the helpers that read and write agent-context configuration, and replace the broad Any annotation with object. The runtime behavior stays the same while the module no longer exposes top-level imports that can be flagged as unused when CI analyzes a narrower code shape.

* Prevent workflow commands from targeting reserved storage

Workflow install and removal paths are derived from workflow IDs before any catalog download, local copy, or directory deletion. Validate that IDs are single workflow-id path segments and reject names reserved for workflow runtime storage so commands cannot target .specify/workflows/runs or .specify/workflows/steps.
This commit is contained in:
darion-yaphet
2026-07-01 00:03:54 +08:00
committed by GitHub
parent 36501d459f
commit 810d6fcfe1
4 changed files with 2078 additions and 1548 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -204,7 +204,91 @@ class TestWorkflowRunWithoutProject:
os.chdir(old_cwd)
assert result.exit_code != 0
assert "Refusing to use symlinked .specify path in current directory" in result.output
assert "Refusing to use symlinked .specify path" in result.output
def test_workflow_run_yaml_rejects_symlinked_workflows_dir(self, tmp_path):
"""Running local YAML should fail when .specify/workflows is a symlink."""
from typer.testing import CliRunner
from specify_cli import app
runner = CliRunner()
workflow_file = tmp_path / "test-workflow.yml"
workflow_content = {
"schema_version": "1.0",
"workflow": {
"id": "symlink-workflows-test",
"name": "Symlink Workflows Test",
"version": "1.0.0",
"description": "A workflow for symlink guard testing",
},
"steps": [{"id": "noop", "type": "shell", "run": "echo done"}],
}
workflow_file.write_text(yaml.dump(workflow_content), encoding="utf-8")
(tmp_path / ".specify").mkdir()
target_dir = tmp_path / "real-workflows-dir"
target_dir.mkdir()
try:
(tmp_path / ".specify" / "workflows").symlink_to(
target_dir, target_is_directory=True
)
except (OSError, NotImplementedError):
pytest.skip("Symlinks are not available in this environment")
old_cwd = os.getcwd()
try:
os.chdir(tmp_path)
result = runner.invoke(app, [
"workflow", "run", str(workflow_file),
], catch_exceptions=False)
finally:
os.chdir(old_cwd)
assert result.exit_code != 0
assert "Refusing to use symlinked .specify/workflows path" in result.output
def test_workflow_run_yaml_rejects_symlinked_runs_dir(self, tmp_path):
"""Running local YAML should fail when .specify/workflows/runs is a symlink."""
from typer.testing import CliRunner
from specify_cli import app
runner = CliRunner()
workflow_file = tmp_path / "test-workflow.yml"
workflow_content = {
"schema_version": "1.0",
"workflow": {
"id": "symlink-runs-test",
"name": "Symlink Runs Test",
"version": "1.0.0",
"description": "A workflow for symlink guard testing",
},
"steps": [{"id": "noop", "type": "shell", "run": "echo done"}],
}
workflow_file.write_text(yaml.dump(workflow_content), encoding="utf-8")
(tmp_path / ".specify" / "workflows").mkdir(parents=True)
target_dir = tmp_path / "real-runs-dir"
target_dir.mkdir()
try:
(tmp_path / ".specify" / "workflows" / "runs").symlink_to(
target_dir, target_is_directory=True
)
except (OSError, NotImplementedError):
pytest.skip("Symlinks are not available in this environment")
old_cwd = os.getcwd()
try:
os.chdir(tmp_path)
result = runner.invoke(app, [
"workflow", "run", str(workflow_file),
], catch_exceptions=False)
finally:
os.chdir(old_cwd)
assert result.exit_code != 0
assert "Refusing to use symlinked .specify/workflows/runs path" in result.output
def test_workflow_run_yaml_rejects_non_directory_specify_path(self, tmp_path):
"""Running local YAML should fail when .specify is not a directory."""

View File

@@ -5303,6 +5303,279 @@ class TestWorkflowStepRemoveCLI:
assert "Refusing to use symlinked step directory" in result.output
class TestWorkflowRemoveGuard:
def test_remove_rejects_traversal_registry_key(self, project_dir, monkeypatch):
"""A corrupted registry key must not let remove delete outside workflows/."""
from typer.testing import CliRunner
from specify_cli import app
from specify_cli.workflows.catalog import WorkflowRegistry
registry = WorkflowRegistry(project_dir)
registry.add("../outside", {"name": "Bad"})
outside = project_dir / ".specify" / "outside"
outside.mkdir()
sentinel = outside / "keep.txt"
sentinel.write_text("keep", encoding="utf-8")
monkeypatch.chdir(project_dir)
result = CliRunner().invoke(app, ["workflow", "remove", "../outside"])
assert result.exit_code != 0
assert "Invalid workflow ID" in result.output
assert sentinel.read_text(encoding="utf-8") == "keep"
@pytest.mark.parametrize("workflow_id", ["runs", "steps"])
def test_remove_rejects_reserved_storage_ids(
self, project_dir, monkeypatch, workflow_id
):
"""Reserved workflow storage directories must never be removable workflows."""
from typer.testing import CliRunner
from specify_cli import app
from specify_cli.workflows.catalog import WorkflowRegistry
registry = WorkflowRegistry(project_dir)
registry.add(workflow_id, {"name": "Bad"})
reserved_dir = project_dir / ".specify" / "workflows" / workflow_id
reserved_dir.mkdir(exist_ok=True)
sentinel = reserved_dir / "keep.txt"
sentinel.write_text("keep", encoding="utf-8")
monkeypatch.chdir(project_dir)
result = CliRunner().invoke(app, ["workflow", "remove", workflow_id])
assert result.exit_code != 0
assert "Invalid workflow ID" in result.output
assert sentinel.read_text(encoding="utf-8") == "keep"
@pytest.mark.skipif(not hasattr(os, "symlink"), reason="symlinks are unavailable")
def test_remove_refuses_symlinked_workflow_dir(self, project_dir, monkeypatch):
"""A symlinked workflow directory must not let remove delete its target."""
from typer.testing import CliRunner
from specify_cli import app
from specify_cli.workflows.catalog import WorkflowRegistry
registry = WorkflowRegistry(project_dir)
registry.add("test-wf", {"name": "Test"})
outside = project_dir / "outside-workflow-remove-target"
outside.mkdir(exist_ok=True)
sentinel = outside / "keep.txt"
sentinel.write_text("keep", encoding="utf-8")
(project_dir / ".specify" / "workflows" / "test-wf").symlink_to(
outside, target_is_directory=True
)
monkeypatch.chdir(project_dir)
result = CliRunner().invoke(app, ["workflow", "remove", "test-wf"])
assert result.exit_code != 0
assert "symlinked .specify/workflows/test-wf" in result.output
assert sentinel.read_text(encoding="utf-8") == "keep"
assert WorkflowRegistry(project_dir).is_installed("test-wf")
def test_remove_refuses_non_directory_workflow_path(self, project_dir, monkeypatch):
"""A file at the workflow path must fail cleanly instead of crashing."""
from typer.testing import CliRunner
from specify_cli import app
from specify_cli.workflows.catalog import WorkflowRegistry
registry = WorkflowRegistry(project_dir)
registry.add("test-wf", {"name": "Test"})
workflow_path = project_dir / ".specify" / "workflows" / "test-wf"
workflow_path.write_text("not a directory", encoding="utf-8")
monkeypatch.chdir(project_dir)
result = CliRunner().invoke(app, ["workflow", "remove", "test-wf"])
assert result.exit_code != 0
assert "exists but is not a directory" in result.output
assert workflow_path.read_text(encoding="utf-8") == "not a directory"
assert WorkflowRegistry(project_dir).is_installed("test-wf")
class TestWorkflowAddSymlinkGuard:
@pytest.mark.skipif(not hasattr(os, "symlink"), reason="symlinks are unavailable")
def test_add_refuses_symlinked_specify(self, temp_dir, monkeypatch):
"""workflow add must refuse a symlinked .specify (writes could escape root)."""
from typer.testing import CliRunner
from specify_cli import app
outside = temp_dir.parent / "outside-specify-target"
(outside / "workflows").mkdir(parents=True, exist_ok=True)
(temp_dir / ".specify").symlink_to(outside, target_is_directory=True)
monkeypatch.chdir(temp_dir)
result = CliRunner().invoke(app, ["workflow", "add", "anything.yml"])
assert result.exit_code != 0
assert "symlinked .specify" in result.output
@pytest.mark.skipif(not hasattr(os, "symlink"), reason="symlinks are unavailable")
def test_add_refuses_symlinked_workflows_dir(self, temp_dir, monkeypatch):
"""workflow add must refuse a symlinked .specify/workflows directory."""
from typer.testing import CliRunner
from specify_cli import app
(temp_dir / ".specify").mkdir()
outside = temp_dir.parent / "outside-workflows-target"
outside.mkdir(parents=True, exist_ok=True)
(temp_dir / ".specify" / "workflows").symlink_to(outside, target_is_directory=True)
monkeypatch.chdir(temp_dir)
result = CliRunner().invoke(app, ["workflow", "add", "anything.yml"])
assert result.exit_code != 0
assert "symlinked .specify/workflows" in result.output
@pytest.mark.skipif(not hasattr(os, "symlink"), reason="symlinks are unavailable")
def test_add_refuses_symlinked_id_dir(self, temp_dir, monkeypatch, sample_workflow_yaml):
"""A symlinked <id> install dir must not let a copy escape the project root."""
from typer.testing import CliRunner
from specify_cli import app
(temp_dir / ".specify" / "workflows").mkdir(parents=True)
outside = temp_dir.parent / "outside-id-target"
outside.mkdir(parents=True, exist_ok=True)
# <id> from the YAML below is "test-workflow"; plant it as a symlink.
(temp_dir / ".specify" / "workflows" / "test-workflow").symlink_to(
outside, target_is_directory=True
)
src = temp_dir / "incoming.yml"
src.write_text(sample_workflow_yaml, encoding="utf-8")
monkeypatch.chdir(temp_dir)
result = CliRunner().invoke(app, ["workflow", "add", str(src)])
assert result.exit_code != 0
# No write-through: the symlink target stays empty.
assert not (outside / "workflow.yml").exists()
@pytest.mark.skipif(not hasattr(os, "symlink"), reason="symlinks are unavailable")
def test_add_refuses_symlinked_workflow_yml_leaf(self, temp_dir, monkeypatch, sample_workflow_yaml):
"""A symlinked <id>/workflow.yml must not let copy2 write through the link."""
from typer.testing import CliRunner
from specify_cli import app
id_dir = temp_dir / ".specify" / "workflows" / "test-workflow"
id_dir.mkdir(parents=True)
outside_file = temp_dir.parent / "outside-leaf-target.yml"
outside_file.write_text("original\n", encoding="utf-8")
(id_dir / "workflow.yml").symlink_to(outside_file)
src = temp_dir / "incoming.yml"
src.write_text(sample_workflow_yaml, encoding="utf-8")
monkeypatch.chdir(temp_dir)
result = CliRunner().invoke(app, ["workflow", "add", str(src)])
assert result.exit_code != 0
# Rich may wrap the message; assert on the unbroken path fragment.
assert "test-workflow/workflow.yml" in result.output
assert "symlinked" in result.output
# The link target content is untouched.
assert outside_file.read_text(encoding="utf-8") == "original\n"
def test_add_refuses_non_directory_id(self, temp_dir, monkeypatch, sample_workflow_yaml):
"""An <id> path that already exists as a file must fail cleanly, not crash."""
from typer.testing import CliRunner
from specify_cli import app
wf_dir = temp_dir / ".specify" / "workflows"
wf_dir.mkdir(parents=True)
(wf_dir / "test-workflow").write_text("not a dir", encoding="utf-8")
src = temp_dir / "incoming.yml"
src.write_text(sample_workflow_yaml, encoding="utf-8")
monkeypatch.chdir(temp_dir)
result = CliRunner().invoke(app, ["workflow", "add", str(src)])
assert result.exit_code != 0
assert "exists but is not a directory" in result.output
assert result.exception is None or isinstance(result.exception, SystemExit)
def test_add_refuses_workflow_yml_as_directory(self, temp_dir, monkeypatch, sample_workflow_yaml):
"""A pre-existing <id>/workflow.yml *directory* must fail cleanly, not crash."""
from typer.testing import CliRunner
from specify_cli import app
id_dir = temp_dir / ".specify" / "workflows" / "test-workflow"
id_dir.mkdir(parents=True)
# Plant workflow.yml as a directory so a later write/copy2 would raise
# IsADirectoryError without the explicit non-file guard.
(id_dir / "workflow.yml").mkdir()
src = temp_dir / "incoming.yml"
src.write_text(sample_workflow_yaml, encoding="utf-8")
monkeypatch.chdir(temp_dir)
result = CliRunner().invoke(app, ["workflow", "add", str(src)])
assert result.exit_code != 0
assert "test-workflow/workflow.yml" in result.output
assert "is not a file" in result.output
# Clean exit, not an unhandled IsADirectoryError traceback.
assert result.exception is None or isinstance(result.exception, SystemExit)
def test_safe_workflow_id_dir_escapes_markup_in_invalid_id(self, temp_dir, capsys):
"""A traversal <id> carrying Rich markup must be escaped, not interpreted."""
import typer
from specify_cli.workflows._commands import _safe_workflow_id_dir
workflows_dir = temp_dir / ".specify" / "workflows"
workflows_dir.mkdir(parents=True)
# Traversal (so the "Invalid workflow ID" branch fires) plus markup.
with pytest.raises(typer.Exit):
_safe_workflow_id_dir(workflows_dir, "../[red]evil[/red]")
out = capsys.readouterr().out
# Literal bracketed text survives; Rich did not consume it as a tag.
assert "[red]evil[/red]" in out
@pytest.mark.parametrize(
"workflow_id",
[
"runs",
"steps",
"nested/workflow",
"nested\\workflow",
"bad id",
" bad-id",
"bad-id ",
],
)
def test_safe_workflow_id_dir_rejects_reserved_or_non_segment_ids(
self, temp_dir, workflow_id, capsys
):
"""Install IDs must not collide with workflow internals or create nested paths."""
import typer
from specify_cli.workflows._commands import _safe_workflow_id_dir
workflows_dir = temp_dir / ".specify" / "workflows"
workflows_dir.mkdir(parents=True)
with pytest.raises(typer.Exit):
_safe_workflow_id_dir(workflows_dir, workflow_id)
assert "Invalid workflow ID" in capsys.readouterr().out
assert not (workflows_dir / workflow_id).exists()
@pytest.mark.skipif(not hasattr(os, "symlink"), reason="symlinks are unavailable")
def test_list_refuses_symlinked_runs_dir(self, temp_dir, monkeypatch):
"""workflow commands using the project shim must refuse symlinked run storage."""
from typer.testing import CliRunner
from specify_cli import app
(temp_dir / ".specify" / "workflows").mkdir(parents=True)
outside = temp_dir.parent / "outside-runs-target"
outside.mkdir(parents=True, exist_ok=True)
(temp_dir / ".specify" / "workflows" / "runs").symlink_to(
outside, target_is_directory=True
)
monkeypatch.chdir(temp_dir)
result = CliRunner().invoke(app, ["workflow", "list"])
assert result.exit_code != 0
assert "symlinked .specify/workflows/runs" in result.output
class TestWorkflowStepAddCLI:
@pytest.mark.skipif(not hasattr(os, "symlink"), reason="symlinks are unavailable")
def test_add_rejects_symlinked_steps_base_dir(self, project_dir, monkeypatch):
@@ -5616,7 +5889,7 @@ steps:
# at the file-descriptor level, so it sees the subprocess output too.
import subprocess
import sys as _sys
from specify_cli import _stdout_to_stderr_when
from specify_cli.workflows._commands import _stdout_to_stderr_when
print("STDOUT_BEFORE")
with _stdout_to_stderr_when(True):
@@ -5635,7 +5908,7 @@ steps:
assert "PY_LEAK" in err and "SUBPROC_LEAK" in err
def test_json_redirect_inactive_is_noop(self, capfd):
from specify_cli import _stdout_to_stderr_when
from specify_cli.workflows._commands import _stdout_to_stderr_when
with _stdout_to_stderr_when(False):
print("VISIBLE_ON_STDOUT")
@@ -6256,7 +6529,7 @@ steps:
# not cleared afterwards, so a `completed`/`failed` run whose last
# executed step was a gate must NOT surface a stale gate block.
from types import SimpleNamespace
from specify_cli import _gate_outcome
from specify_cli.workflows._commands import _gate_outcome
gate_step = {
"type": "gate",
@@ -6283,7 +6556,7 @@ steps:
# message may be a non-string YAML literal (e.g. a number); the JSON
# surface normalises it so the emitted schema stays stable.
from types import SimpleNamespace
from specify_cli import _gate_outcome
from specify_cli.workflows._commands import _gate_outcome
state = SimpleNamespace(
status=SimpleNamespace(value="paused"),
@@ -6302,7 +6575,7 @@ steps:
# workflow; the JSON surface always normalises them to list[str] | None
# so the emitted schema is stable regardless of the input shape.
from types import SimpleNamespace
from specify_cli import _gate_outcome
from specify_cli.workflows._commands import _gate_outcome
def _options_payload(options):
state = SimpleNamespace(
@@ -6332,7 +6605,7 @@ steps:
# surface normalises it to str (and keeps None = no decision yet),
# consistent with the message/options normalization.
from types import SimpleNamespace
from specify_cli import _gate_outcome
from specify_cli.workflows._commands import _gate_outcome
def _choice_payload(choice):
state = SimpleNamespace(
@@ -6356,7 +6629,7 @@ steps:
# gate is still detected by its unique output signature (`on_reject`),
# so resume surfaces the gate block instead of silently dropping it.
from types import SimpleNamespace
from specify_cli import _gate_outcome
from specify_cli.workflows._commands import _gate_outcome
state = SimpleNamespace(
status=SimpleNamespace(value="paused"),
@@ -6382,7 +6655,7 @@ steps:
# A typeless record lacking the gate signature must NOT be mistaken for
# a gate (the fallback keys off `on_reject`, which only GateStep writes).
from types import SimpleNamespace
from specify_cli import _gate_outcome
from specify_cli.workflows._commands import _gate_outcome
state = SimpleNamespace(
status=SimpleNamespace(value="paused"),