mirror of
https://github.com/github/spec-kit.git
synced 2026-07-03 12:28:06 +08:00
fix(workflows): validate run_id in RunState.load before touching the … (#2813)
* fix(workflows): validate run_id in RunState.load before touching the filesystem
``RunState.load(run_id, project_root)`` interpolates ``run_id`` directly
into ``project_root / ".specify" / "workflows" / "runs" / run_id`` and
then calls ``state_path.exists()`` and ``json.load`` on the result. The
run_id is reachable from user input via ``specify workflow resume
<run_id>`` (CLI argument) and via ``SPECKIT_WORKFLOW_RUN_ID`` (env var
override on the engine's run path), so a value like ``../escape``
turns ``runs_dir`` into ``.specify/workflows/escape/`` and:
* ``state_path.exists()`` becomes a file-existence oracle for any
path the process can read.
* if a ``state.json`` exists at the traversed location (planted by
a malicious dependency, a misconfigured shared workspace, or an
older spec-kit version that happened to write there),
``json.load`` parses it and the workflow resumes under the
attacker-chosen ``workflow_id`` / step state.
* a subsequent ``state.save()`` then writes back to the traversed
location, persisting the corruption.
``RunState.__init__`` already validates ``run_id`` against
``r'^[a-zA-Z0-9][a-zA-Z0-9_-]*$'`` — but that check runs on
``state_data["run_id"]`` *after* ``load`` has already done the file
lookup, which is too late to prevent the disclosure.
This change extracts the pattern into a class-level constant
``_RUN_ID_PATTERN`` and a single ``_validate_run_id`` classmethod so
``__init__`` and ``load`` cannot drift, then calls the validator at the
top of ``load`` before any path is built. Mirrors the precedent in
``src/specify_cli/agents.py::_ensure_within_directory`` (used at line
437 of that file) which guards extension-install paths against the
same threat model.
Regression tests parametrize 9 traversal vectors (``../escape``,
``..``, ``../../etc/passwd``, ``foo/bar``, ``foo\bar``, ``.hidden``,
``-flag``, ``foo\x00bar``, empty) and plant a malicious ``state.json``
outside ``runs/`` so a missing guard would surface as a successful
load rather than the ambiguous ``FileNotFoundError``. A second test
asserts ``__init__`` and ``load`` reject the same representative
malformed ID, so future changes to one path can't silently drift from
the other.
* test(workflows): exercise RunState.load in shared-validation test, fix __init__ empty-string asymmetry
Copilot's review on this PR pointed out that
test_init_and_load_share_validation claimed to verify both entry
points share the same validation rules but never actually called
RunState.load — only __init__ and the shared
_validate_run_id helper. A regression in load (e.g. someone
deleting the cls._validate_run_id(run_id) call before the path is
built) would slip through even though __init__ and the helper
stayed aligned, defeating the whole point of the test.
Tightening the test surfaced a real asymmetry the previous version was
silently masking:
self.run_id = run_id or str(uuid.uuid4())[:8]
The truthiness fallback meant RunState(run_id="") silently
substituted a UUID and skipped validation, while
RunState.load("", project_root) correctly rejected the empty
string. The two entry points diverged on the empty-string vector.
That is exactly the drift the test name claimed to defend against —
and the original test missed it.
Changes
-------
* engine.py: __init__ now distinguishes run_id is None
(caller omitted it → auto-generate UUID) from an empty string
(caller provided it → must validate like any other value). Both
paths still flow through _validate_run_id, but only the
explicit-None case auto-generates.
* test_workflows.py: test_init_and_load_share_validation is
now parametrized over one representative vector per category from
test_load_rejects_path_traversal (parent traversal, embedded
separator, leading non-alphanumeric, empty string) and asserts that
*all three* entry points — __init__, _validate_run_id, and
load — reject the same input. Adding load to the assertion
is the substantive fix Copilot asked for; keeping __init__ and
the helper alongside it makes any future drift between the three
immediately observable instead of having to read three separate
tests.
Verification
------------
pytest tests/test_workflows.py — 168 passed (was 165 before the
parametrize expansion; __init__ empty-string vector would have
failed the new test against the old engine code, confirming the
asymmetry was real).
This commit is contained in:
@@ -281,16 +281,49 @@ def _validate_steps(
|
||||
class RunState:
|
||||
"""Manages workflow run state for persistence and resume."""
|
||||
|
||||
# ``run_id`` is interpolated into a filesystem path (``runs/<run_id>``)
|
||||
# by both ``save()`` and ``load()``. Constrain it to a charset that
|
||||
# cannot contain path separators (``/`` ``\``), parent-directory
|
||||
# segments (``..``), or NULs — anything that could escape the
|
||||
# ``.specify/workflows/runs/`` directory or be mis-interpreted by the
|
||||
# filesystem. The first-character anchor blocks IDs that start with
|
||||
# ``-`` (which would be mistaken for a CLI flag in error messages
|
||||
# and shell completions).
|
||||
_RUN_ID_PATTERN = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_-]*$")
|
||||
|
||||
@classmethod
|
||||
def _validate_run_id(cls, run_id: str) -> None:
|
||||
"""Raise ``ValueError`` if ``run_id`` is not a safe path component.
|
||||
|
||||
This is the single source of truth for what counts as a valid
|
||||
``run_id``. ``__init__`` calls it to reject malformed IDs at
|
||||
construction time; ``load`` calls it *before* interpolating the
|
||||
ID into a path so a malicious value cannot probe or read files
|
||||
outside ``.specify/workflows/runs/<run_id>/``.
|
||||
"""
|
||||
if not isinstance(run_id, str) or not cls._RUN_ID_PATTERN.match(run_id):
|
||||
raise ValueError(
|
||||
f"Invalid run_id {run_id!r}: must be alphanumeric with "
|
||||
"hyphens/underscores only (and must start with an "
|
||||
"alphanumeric character)."
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
run_id: str | None = None,
|
||||
workflow_id: str = "",
|
||||
project_root: Path | None = None,
|
||||
) -> None:
|
||||
self.run_id = run_id or str(uuid.uuid4())[:8]
|
||||
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9_-]*$', self.run_id):
|
||||
msg = f"Invalid run_id {self.run_id!r}: must be alphanumeric with hyphens/underscores only."
|
||||
raise ValueError(msg)
|
||||
# ``run_id is None`` (omitted) → auto-generate. An explicit empty
|
||||
# string is *not* the same as "omitted" and must be validated like
|
||||
# any other caller-provided value — otherwise ``__init__("")``
|
||||
# would silently substitute a UUID while ``load("")`` rejects, and
|
||||
# the two entry points would diverge on the empty-string vector.
|
||||
if run_id is None:
|
||||
self.run_id = str(uuid.uuid4())[:8]
|
||||
else:
|
||||
self.run_id = run_id
|
||||
self._validate_run_id(self.run_id)
|
||||
self.workflow_id = workflow_id
|
||||
self.project_root = project_root or Path(".")
|
||||
self.status = RunStatus.CREATED
|
||||
@@ -331,7 +364,20 @@ class RunState:
|
||||
|
||||
@classmethod
|
||||
def load(cls, run_id: str, project_root: Path) -> RunState:
|
||||
"""Load a run state from disk."""
|
||||
"""Load a run state from disk.
|
||||
|
||||
Validates ``run_id`` against ``_RUN_ID_PATTERN`` *before* building
|
||||
the lookup path. Without this guard, a caller passing a value like
|
||||
``../escape`` (e.g. via ``specify workflow resume`` CLI argument)
|
||||
would interpolate path-traversal segments into
|
||||
``runs_dir`` below, letting ``state_path.exists()`` probe arbitrary
|
||||
paths and ``json.load`` read attacker-planted JSON from outside
|
||||
the project's ``runs/`` directory. ``__init__`` already runs this
|
||||
check on the stored ``state_data["run_id"]``, but that fires
|
||||
*after* the file lookup — too late to prevent the disclosure.
|
||||
Mirrors the precedent in ``agents._ensure_within_directory``.
|
||||
"""
|
||||
cls._validate_run_id(run_id)
|
||||
runs_dir = project_root / ".specify" / "workflows" / "runs" / run_id
|
||||
state_path = runs_dir / "state.json"
|
||||
if not state_path.exists():
|
||||
|
||||
@@ -2716,6 +2716,112 @@ class TestRunState:
|
||||
with pytest.raises(FileNotFoundError):
|
||||
RunState.load("nonexistent", project_dir)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"malicious_run_id",
|
||||
[
|
||||
# Parent-directory traversal — the classic path-escape vector.
|
||||
"../escape",
|
||||
"..",
|
||||
"../../etc/passwd",
|
||||
# Embedded path separators — both POSIX and Windows.
|
||||
"foo/bar",
|
||||
"foo\\bar",
|
||||
# Leading non-alphanumeric characters that the existing
|
||||
# pattern's anchor blocks (would be mistaken for CLI flags
|
||||
# or hidden files in shell completions / error messages).
|
||||
".hidden",
|
||||
"-flag",
|
||||
# NUL byte — some filesystems treat the prefix as a valid
|
||||
# path and silently truncate at the NUL.
|
||||
"foo\x00bar",
|
||||
# Empty string — degenerate case, matches no file but the
|
||||
# validator should reject it before any I/O.
|
||||
"",
|
||||
],
|
||||
)
|
||||
def test_load_rejects_path_traversal(self, project_dir, malicious_run_id):
|
||||
"""``RunState.load`` validates ``run_id`` before touching the
|
||||
filesystem.
|
||||
|
||||
Without this guard, a value like ``../escape`` passed via
|
||||
``specify workflow resume`` would interpolate path-traversal
|
||||
segments into the lookup path. ``state_path.exists()`` would
|
||||
probe arbitrary paths the process can read (a file-existence
|
||||
oracle) and ``json.load`` would happily parse attacker-planted
|
||||
JSON from outside ``.specify/workflows/runs/``. The check must
|
||||
fire *before* the path is built — ``__init__``'s identical
|
||||
regex on ``state_data["run_id"]`` fires too late.
|
||||
"""
|
||||
from specify_cli.workflows.engine import RunState
|
||||
|
||||
# Plant a state.json *outside* the legitimate ``runs/`` directory
|
||||
# at the location ``../escape`` would traverse to, so a missing
|
||||
# guard would surface as a successful load rather than a
|
||||
# ``FileNotFoundError`` (which would be ambiguous with the
|
||||
# not-found case).
|
||||
runs_dir = project_dir / ".specify" / "workflows" / "runs"
|
||||
runs_dir.mkdir(parents=True, exist_ok=True)
|
||||
attacker_dir = project_dir / ".specify" / "workflows" / "escape"
|
||||
attacker_dir.mkdir(exist_ok=True)
|
||||
(attacker_dir / "state.json").write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"run_id": "pwned",
|
||||
"workflow_id": "attacker-owned",
|
||||
"status": "created",
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid run_id"):
|
||||
RunState.load(malicious_run_id, project_dir)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"bad_run_id",
|
||||
[
|
||||
# One vector per category from ``test_load_rejects_path_traversal``
|
||||
# — enough to prove both entry points agree without re-running
|
||||
# the full attack matrix here.
|
||||
"../escape", # parent-directory traversal
|
||||
"foo/bar", # embedded path separator
|
||||
".hidden", # leading non-alphanumeric
|
||||
"", # empty / degenerate
|
||||
],
|
||||
)
|
||||
def test_init_and_load_share_validation(self, project_dir, bad_run_id):
|
||||
"""``__init__`` *and* ``load`` reject the same malformed IDs.
|
||||
|
||||
The two entry points must stay in sync — drift would let an ID
|
||||
slip in via one path that the other would reject, producing
|
||||
confusing crashes mid-workflow. The previous version of this
|
||||
test only exercised ``__init__`` and ``_validate_run_id`` (the
|
||||
shared helper), so a regression in ``load`` — e.g. someone
|
||||
deleting the ``cls._validate_run_id(run_id)`` call there — could
|
||||
slip through despite ``__init__`` and the helper staying
|
||||
aligned. We now hit ``load`` directly with the same vector so
|
||||
any drift between the two call sites is caught by this test.
|
||||
"""
|
||||
from specify_cli.workflows.engine import RunState
|
||||
|
||||
# ``__init__`` rejects up front.
|
||||
with pytest.raises(ValueError, match="Invalid run_id"):
|
||||
RunState(run_id=bad_run_id)
|
||||
|
||||
# The shared helper rejects the value too (sanity check that the
|
||||
# ``__init__`` rejection came from the validator, not some
|
||||
# unrelated constructor failure).
|
||||
with pytest.raises(ValueError, match="Invalid run_id"):
|
||||
RunState._validate_run_id(bad_run_id)
|
||||
|
||||
# And ``load`` rejects it *before* touching the filesystem. This
|
||||
# is the assertion the previous version was missing: without it,
|
||||
# a regression in ``load`` (e.g. forgetting to call the
|
||||
# validator before building the path) would not be caught even
|
||||
# though ``__init__`` and the helper still agreed.
|
||||
with pytest.raises(ValueError, match="Invalid run_id"):
|
||||
RunState.load(bad_run_id, project_dir)
|
||||
|
||||
def test_append_log(self, project_dir):
|
||||
from specify_cli.workflows.engine import RunState
|
||||
|
||||
|
||||
Reference in New Issue
Block a user