Add workflow step catalog — community-installable step types (#2394)

* Initial plan

* Add workflow step catalog: StepRegistry, StepCatalog, CLI commands, and tests

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/2885e646-477d-4df8-b9a3-06d8cb29e748

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Potential fix for pull request finding 'An assert statement has a side-effect'

Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>

* Address PR review: path traversal, cache robustness, collision check, failed-to-load display

- Add resolve()+relative_to() path traversal guards in workflow_step_add and
  workflow_step_remove to prevent directory escape via step_id
- Harden _is_url_cache_valid in both StepCatalog and WorkflowCatalog to
  coerce fetched_at to float and catch TypeError/ValueError
- Check STEP_REGISTRY and StepRegistry before installing to prevent
  collisions with built-in step types or already-installed steps
- Show 'Custom (installed, failed to load)' section in workflow step list
  for steps in the registry that failed to load into STEP_REGISTRY

* Fix StepRegistry shape validation and StepCatalog empty-YAML handling

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/0dca6393-f5a9-40de-bb5c-77ba6af033d2

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Polish: rename _default to default_registry, strengthen unreadable-file test

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/0dca6393-f5a9-40de-bb5c-77ba6af033d2

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Address PR review: atomic install, hostname validation, cache resilience, no dynamic imports in list/info

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/3e18fef0-e2e6-4b3e-9e8d-9adb1e5e464e

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Fix shutil.move with existing step_dir: remove before move to avoid subdirectory nesting

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/3e18fef0-e2e6-4b3e-9e8d-9adb1e5e464e

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Call load_custom_steps at execution time; enforce hostname in _safe_fetch and _validate_url

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/73865880-fb25-4061-a43e-4e4b4d1c4de6

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Wrap YAML parsing in try/except; atomic step install via os.rename() under same fs

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/ff915bc5-ec7e-4e6a-b505-35f5795250df

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Validate YAML root is a dict in _load_catalog_config and workflow_step_add; fix WorkflowCatalog hostname validation

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Fix load_custom_steps() package imports and add reserved step ID validation

* Move _re/_sys imports out of loop and _RESERVED_STEP_IDS to module level

* Address review: collision-resistant module names, extra_files support, remove orphan dir

* Harden extra_files: warn on non-dict, resolve symlinks in path traversal check

* Switch _safe_fetch and StepCatalog._fetch_single_catalog to use open_url for auth consistency

* Harden step_id validation against path-segment tricks; raise on StepRegistry.save() OSError

* Clean up sys.modules on broken step packages; handle StepValidationError in step add/remove

* Address review thread: int-coerce priorities, sys.modules cleanup, _require_specify_project, registry-first remove

* fix: normalize workflow step catalog metadata fallbacks

* fix: address latest workflow step and catalog review findings

* Handle non-string extra_files keys in workflow step add

* Harden StepRegistry symlink reads and extra_files path/URL validation

* Harden custom step loader and step remove against symlinks and OSError

* Fix StepCatalog.search() to coerce non-string fields before joining

* Fix WorkflowCatalog YAML parsing error handling and isinstance checks

* Harden step registry save and custom step/catalog ID handling

* Harden cache validation and staging OSError handling

* Address review: reorder symlink guard and split mixed test

- Move symlink-parent check before is_dir() in load_custom_steps() so
  we never stat an external target through a symlink
- Split test_get_merged_steps_normalizes_list_ids_to_strings into two
  focused tests: one for list-id normalization, one for get_step_info
  return values

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review: symlink-before-stat in loader, restore registry on rmtree failure

- load_custom_steps(): check is_symlink() before is_dir() on step
  directories so symlinked entries are skipped without statting external
  targets
- workflow_step_remove: restore the registry entry when shutil.rmtree()
  fails so filesystem and registry state stay consistent and a future
  'step add' isn't blocked

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Harden step_id validation and file-write error handling

- _validate_step_id_or_exit: reject whitespace-only/padded IDs,
  Windows-invalid characters (<>:"|?*), control characters, trailing
  dots/spaces, and Windows reserved device names (con, nul, etc.)
- Wrap step.yml/__init__.py staging writes in OSError handler
- Wrap extra_files disk writes (mkdir + write_bytes) in OSError handler
  that names the failing relative path
- Registry rollback on rmtree failure: restore verbatim metadata and
  emit a warning if the restore itself fails

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review: cache symlink guard, verbatim registry rollback, Windows test fix

- StepCatalog: add _is_cache_path_safe() guard that checks for symlinks
  in .specify/workflows/steps/.cache path; skip cache reads and writes
  when any component is symlinked to prevent writes outside project root
- Registry rollback: write metadata directly to registry.data['steps']
  and call save() instead of using add() which overwrites timestamps
- temp_dir fixture: use ignore_errors=True on Windows to avoid flaky
  teardown from locked file handles (WinError 32)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Simplify exec_module call by removing redundant nested try/except

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix empty YAML tolerance in WorkflowCatalog.add_catalog, scope ignore_errors to Windows

- WorkflowCatalog.add_catalog(): treat None from yaml.safe_load() (empty
  file) as an empty mapping instead of raising 'corrupted'
- temp_dir fixture: limit ignore_errors to sys.platform == 'win32' so
  real cleanup issues surface on non-Windows platforms

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Chain exceptions in _load_catalog_config for both catalog classes

Add 'from exc' to preserve root cause in tracebacks while keeping
clean user-facing messages.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Make default catalog tests hermetic by isolating HOME

Monkeypatch Path.home() to project_dir and clear catalog env vars so
tests don't break on machines with a real ~/.specify/step-catalogs.yml
or ~/.specify/workflow-catalogs.yml.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix falsy ID handling in _get_merged_steps for list-based catalogs

Check for None explicitly instead of using 'or' which drops valid
falsy IDs like 0.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Compare reserved step IDs case-insensitively for filesystem safety

On case-insensitive filesystems (Windows, common macOS), variants like
STEP-REGISTRY.JSON would collide with the actual registry file.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add explanatory comments to intentional empty except blocks

Document why cache-read failures are silently ignored in both
WorkflowCatalog and StepCatalog _fetch_single_catalog methods.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: Manfred Riem <mnriem@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Copilot
2026-06-16 18:03:45 -05:00
committed by GitHub
parent 9cd20c6c25
commit c52ccd7dc7
6 changed files with 2502 additions and 20 deletions

View File

@@ -2058,6 +2058,20 @@ workflow_catalog_app = typer.Typer(
)
workflow_app.add_typer(workflow_catalog_app, name="catalog")
workflow_step_app = typer.Typer(
name="step",
help="Manage workflow step types",
add_completion=False,
)
workflow_app.add_typer(workflow_step_app, name="step")
workflow_step_catalog_app = typer.Typer(
name="catalog",
help="Manage step catalogs",
add_completion=False,
)
workflow_step_app.add_typer(workflow_step_catalog_app, name="catalog")
def _parse_input_values(input_values: list[str] | None) -> dict[str, Any]:
"""Parse repeated ``key=value`` CLI inputs into a dict.
@@ -2139,6 +2153,7 @@ def workflow_run(
),
):
"""Run a workflow from an installed ID or local YAML path."""
from .workflows import load_custom_steps
from .workflows.engine import WorkflowEngine
source_path = Path(source).expanduser()
@@ -2158,6 +2173,7 @@ def workflow_run(
else:
project_root = _require_specify_project()
load_custom_steps(project_root)
engine = WorkflowEngine(project_root)
if not json_output:
engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026")
@@ -2227,9 +2243,11 @@ def workflow_resume(
),
):
"""Resume a paused or failed workflow run."""
from .workflows import load_custom_steps
from .workflows.engine import WorkflowEngine
project_root = _require_specify_project()
load_custom_steps(project_root)
engine = WorkflowEngine(project_root)
if not json_output:
engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026")
@@ -2819,6 +2837,662 @@ def workflow_catalog_remove(
console.print(f"[green]✓[/green] Catalog source '{removed_name}' removed")
# ===== Workflow Step Commands =====
@workflow_step_app.command("list")
def workflow_step_list():
"""List installed step types (built-in and custom)."""
from .workflows import STEP_REGISTRY
from .workflows.catalog import StepRegistry
project_root = _require_specify_project()
specify_dir = project_root / ".specify"
# Read installed custom steps from registry only — no dynamic imports
installed: dict = {}
if specify_dir.exists():
registry = StepRegistry(project_root)
installed = registry.list()
console.print("\n[bold cyan]Installed Step Types:[/bold cyan]\n")
built_in = sorted(k for k in STEP_REGISTRY if k not in installed)
if built_in:
console.print(" [bold]Built-in:[/bold]")
for key in built_in:
console.print(f"{key}")
console.print()
if installed:
console.print(" [bold]Custom (installed):[/bold]")
for key in sorted(installed):
meta = installed[key] or {}
name = meta.get("name", key)
version = meta.get("version", "?")
console.print(f" • [bold]{name}[/bold] ({key}) v{version}")
console.print()
if not built_in and not installed:
console.print("[yellow]No step types found.[/yellow]")
if specify_dir.exists():
console.print(
" Install a new step type with: [cyan]specify workflow step add <id>[/cyan]"
)
# IDs that map to internal names used under .specify/workflows/steps/ and must
# not be used as custom step IDs (dotfile check is done separately at runtime).
_RESERVED_STEP_IDS: frozenset[str] = frozenset({".cache", "step-registry.json"})
# Windows reserved device names (case-insensitive, with or without extensions)
_WINDOWS_RESERVED_NAMES: frozenset[str] = frozenset({
"con", "prn", "aux", "nul",
"com1", "com2", "com3", "com4", "com5", "com6", "com7", "com8", "com9",
"lpt1", "lpt2", "lpt3", "lpt4", "lpt5", "lpt6", "lpt7", "lpt8", "lpt9",
})
# Characters invalid in filenames on Windows
_WINDOWS_INVALID_CHARS: frozenset[str] = frozenset('<>:"|?*')
def _validate_step_id_or_exit(step_id: str) -> None:
"""Validate that ``step_id`` is a single safe path component.
Rejects empty strings, whitespace-only strings, leading/trailing whitespace,
path separators, ``.``/``..`` components, dotfile prefixes, reserved names,
Windows-invalid filename characters, trailing dots/spaces, and Windows
reserved device names. Exits with code 1 on failure.
"""
# Strip the stem (before first dot) for Windows reserved-name check
stem = step_id.split(".")[0].lower() if step_id else ""
if (
not step_id
or not step_id.strip()
or step_id != step_id.strip()
or "/" in step_id
or "\\" in step_id
or step_id in (".", "..")
or step_id.startswith(".")
or step_id.endswith(".")
or step_id.endswith(" ")
or step_id.lower() in _RESERVED_STEP_IDS
or stem in _WINDOWS_RESERVED_NAMES
or any(c in _WINDOWS_INVALID_CHARS for c in step_id)
or any(ord(c) < 32 for c in step_id)
):
console.print(
f"[red]Error:[/red] Invalid step id '{step_id}': must be a single safe "
"path component (no separators, no leading dot, not a reserved name, "
"no invalid filename characters)"
)
raise typer.Exit(1)
def _resolve_steps_base_dir_or_exit(project_root: Path) -> Path:
"""Resolve .specify/workflows/steps while refusing symlinked parent directories."""
project_root_resolved = project_root.resolve()
steps_base_dir_unresolved = project_root / ".specify" / "workflows" / "steps"
current = project_root
for part in (".specify", "workflows", "steps"):
current = current / part
if current.is_symlink():
console.print(
f"[red]Error:[/red] Refusing to use symlinked step directory '{current}'"
)
raise typer.Exit(1)
if current.exists() and not current.is_dir():
console.print(
f"[red]Error:[/red] Step directory path is not a directory: '{current}'"
)
raise typer.Exit(1)
steps_base_dir = steps_base_dir_unresolved.resolve()
try:
steps_base_dir.relative_to(project_root_resolved)
except ValueError:
console.print(
f"[red]Error:[/red] Step directory escapes project root: '{steps_base_dir}'"
)
raise typer.Exit(1)
return steps_base_dir
@workflow_step_app.command("add")
def workflow_step_add(
step_id: str = typer.Argument(..., help="Step type ID from catalog"),
):
"""Install a custom step type from the step catalog."""
from .workflows.catalog import StepCatalog, StepCatalogError, StepRegistry, StepValidationError
project_root = _require_specify_project()
catalog = StepCatalog(project_root)
try:
info = catalog.get_step_info(step_id)
except StepCatalogError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(1)
if not info:
console.print(f"[red]Error:[/red] Step type '{step_id}' not found in catalog")
raise typer.Exit(1)
if not info.get("_install_allowed", True):
console.print(
f"[yellow]Warning:[/yellow] Step type '{step_id}' is from a discovery-only catalog"
)
console.print("Direct installation is not enabled for this catalog source.")
raise typer.Exit(1)
# Reject step IDs that collide with built-in step types
from .workflows import STEP_REGISTRY as _step_reg
if step_id in _step_reg:
console.print(
f"[red]Error:[/red] Step type '{step_id}' conflicts with a built-in step type"
)
raise typer.Exit(1)
# Reject if already installed
registry = StepRegistry(project_root)
if registry.is_installed(step_id):
console.print(
f"[red]Error:[/red] Step type '{step_id}' is already installed. "
"Remove it first with: [cyan]specify workflow step remove "
f"{step_id}[/cyan]"
)
raise typer.Exit(1)
step_yml_url = info.get("step_yml_url") or info.get("url")
if not step_yml_url:
console.print(f"[red]Error:[/red] Catalog entry for '{step_id}' has no URL")
raise typer.Exit(1)
# Derive __init__.py URL: replace trailing step.yml with __init__.py
# or use explicit init_url if provided.
init_url = info.get("init_url")
if not init_url:
if step_yml_url.endswith("step.yml"):
init_url = step_yml_url[: -len("step.yml")] + "__init__.py"
else:
console.print(
f"[red]Error:[/red] Cannot derive __init__.py URL from '{step_yml_url}'. "
"Catalog entry should provide 'init_url' or a 'url' ending in 'step.yml'."
)
raise typer.Exit(1)
from urllib.parse import urlparse
from specify_cli.authentication.http import open_url as _open_url
def _safe_fetch(url: str) -> bytes:
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (parsed.scheme == "http" and is_localhost):
raise ValueError(f"Refusing to fetch from non-HTTPS URL: {url}")
if not parsed.hostname:
raise ValueError(f"Refusing to fetch from URL with no hostname: {url}")
with _open_url(url, timeout=30) as resp:
final_url = resp.geturl()
final_parsed = urlparse(final_url)
final_is_localhost = final_parsed.hostname in ("localhost", "127.0.0.1", "::1")
if final_parsed.scheme != "https" and not (
final_parsed.scheme == "http" and final_is_localhost
):
raise ValueError(f"Redirect to non-HTTPS URL: {final_url}")
if not final_parsed.hostname:
raise ValueError(f"Redirect to URL with no hostname: {final_url}")
return resp.read()
_validate_step_id_or_exit(step_id)
steps_base_dir = _resolve_steps_base_dir_or_exit(project_root)
step_dir = (steps_base_dir / step_id).resolve()
# Defense-in-depth: ensure the resolved directory is a direct child of
# steps_base_dir even after symlink resolution.
try:
rel_parts = step_dir.relative_to(steps_base_dir).parts
except ValueError:
console.print(f"[red]Error:[/red] Invalid step id '{step_id}'")
raise typer.Exit(1)
if rel_parts != (step_id,):
console.print(f"[red]Error:[/red] Invalid step id '{step_id}'")
raise typer.Exit(1)
import shutil
import tempfile
# Refuse if step_dir already exists (e.g. leftover from a previous failed/manual
# install that wasn't registered). The user should remove it before retrying.
if step_dir.exists():
console.print(
f"[red]Error:[/red] Step directory already exists at '{step_dir}'. "
f"Remove it manually or use: [cyan]specify workflow step remove {step_id}[/cyan]"
)
raise typer.Exit(1)
# Create steps_base_dir now so the staging temp dir is on the same filesystem,
# enabling a truly atomic os.rename() below.
try:
steps_base_dir.mkdir(parents=True, exist_ok=True)
tmp_path = Path(tempfile.mkdtemp(prefix="speckit_step_tmp_", dir=steps_base_dir))
except OSError as exc:
console.print(f"[red]Error:[/red] Failed to create staging directory: {exc}")
raise typer.Exit(1)
try:
try:
step_yml_content = _safe_fetch(step_yml_url)
init_py_content = _safe_fetch(init_url)
except Exception as exc:
console.print(f"[red]Error:[/red] Failed to download step files: {exc}")
raise typer.Exit(1)
# Validate step.yml
try:
import yaml as _yaml
meta = _yaml.safe_load(step_yml_content.decode("utf-8")) or {}
except Exception as exc:
console.print(f"[red]Error:[/red] Invalid step.yml: {exc}")
raise typer.Exit(1)
if not isinstance(meta, dict):
console.print("[red]Error:[/red] step.yml must be a YAML mapping")
raise typer.Exit(1)
step_meta = meta.get("step", {})
if not isinstance(step_meta, dict):
console.print("[red]Error:[/red] step.yml 'step' field must be a mapping")
raise typer.Exit(1)
type_key = step_meta.get("type_key", "")
if not type_key:
console.print("[red]Error:[/red] step.yml missing 'step.type_key' field")
raise typer.Exit(1)
if type_key != step_id:
console.print(
f"[red]Error:[/red] step.yml type_key ({type_key!r}) does not match "
f"catalog ID ({step_id!r})"
)
raise typer.Exit(1)
# Write the two required files.
try:
(tmp_path / "step.yml").write_bytes(step_yml_content)
(tmp_path / "__init__.py").write_bytes(init_py_content)
except OSError as exc:
console.print(
f"[red]Error:[/red] Failed to write step files to staging directory: {exc}"
)
raise typer.Exit(1)
# Optionally download additional package files declared in the catalog entry
# (e.g. helper modules). Each entry in ``extra_files`` is a mapping of
# relative-path → URL. step.yml and __init__.py are ignored here (already
# written). Paths are validated to stay within the step package directory to
# prevent path-traversal attacks.
extra_files = info.get("extra_files")
if extra_files is not None and not isinstance(extra_files, dict):
console.print(
"[yellow]Warning:[/yellow] Catalog entry 'extra_files' is not a mapping; "
"additional package files will not be downloaded."
)
extra_files = {}
for rel_path, file_url in (extra_files or {}).items():
if not isinstance(rel_path, str) or not rel_path.strip():
console.print(
"[red]Error:[/red] Catalog entry 'extra_files' contains an "
"empty or non-string path key"
)
raise typer.Exit(1)
if rel_path in ("step.yml", "__init__.py"):
continue # already written above
# Reject dot-path segments ('', '.', '..') that would refer to the
# package directory itself (IsADirectoryError) or escape it.
rel_parts = Path(rel_path).parts
if not rel_parts or any(seg in ("", ".", "..") for seg in rel_parts):
console.print(
f"[red]Error:[/red] extra_files path '{rel_path}' is not a "
"valid relative file path"
)
raise typer.Exit(1)
if not isinstance(file_url, str) or not file_url.strip():
console.print(
f"[red]Error:[/red] extra_files entry '{rel_path}' has an "
"empty or non-string URL"
)
raise typer.Exit(1)
# Resolve both destination and base to handle any symlinks in tmp_path itself,
# ensuring the traversal check is robust even on non-canonical paths.
resolved_base = tmp_path.resolve()
dest = (tmp_path / rel_path).resolve()
try:
dest.relative_to(resolved_base)
except ValueError:
console.print(
f"[red]Error:[/red] extra_files path '{rel_path}' is outside "
"the step package directory"
)
raise typer.Exit(1)
try:
file_content = _safe_fetch(file_url)
except Exception as exc:
console.print(
f"[red]Error:[/red] Failed to download extra file '{rel_path}': {exc}"
)
raise typer.Exit(1)
try:
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(file_content)
except OSError as exc:
console.print(
f"[red]Error:[/red] Failed to write extra file '{rel_path}': {exc}"
)
raise typer.Exit(1)
# Atomically rename the staging directory to the final location.
# Both paths are under steps_base_dir (same filesystem), so os.rename()
# is atomic on POSIX and won't leave a partially-written directory at
# step_dir on failure.
try:
os.rename(tmp_path, step_dir)
except OSError as exc:
console.print(f"[red]Error:[/red] Failed to install step '{step_id}': {exc}")
raise typer.Exit(1)
finally:
# Clean up if the rename hasn't moved tmp_path yet (i.e. on any failure).
shutil.rmtree(tmp_path, ignore_errors=True)
step_name = info.get("name") or step_id
step_version = info.get("version") or step_meta.get("version") or "0.0.0"
# Register in step registry
registry = StepRegistry(project_root)
try:
registry.add(
step_id,
{
"name": step_name,
"version": step_version,
"description": info.get("description", step_meta.get("description", "")),
"author": info.get("author", step_meta.get("author", "")),
"source": "catalog",
"catalog_name": info.get("_catalog_name", ""),
"type_key": type_key,
},
)
except StepValidationError as exc:
# Roll back the just-installed directory so the system isn't left with
# an unregistered step package on disk after a registry write failure
# (e.g. read-only filesystem, permission denied).
shutil.rmtree(step_dir, ignore_errors=True)
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(1)
console.print(
f"[green]✓[/green] Step type '{step_name}' ({step_id}) installed"
)
console.print(
" Use [cyan]specify workflow step list[/cyan] to verify the installation."
)
@workflow_step_app.command("remove")
def workflow_step_remove(
step_id: str = typer.Argument(..., help="Step type ID to uninstall"),
):
"""Uninstall a custom step type."""
from .workflows.catalog import StepRegistry, StepValidationError
project_root = _require_specify_project()
_validate_step_id_or_exit(step_id)
registry = StepRegistry(project_root)
in_registry = registry.is_installed(step_id)
steps_base_dir = _resolve_steps_base_dir_or_exit(project_root)
step_dir = (steps_base_dir / step_id).resolve()
# Defense-in-depth: even though _validate_step_id_or_exit rejects path
# separators, ensure that the resolved directory is a single child of
# steps_base_dir and is not steps_base_dir itself.
try:
rel_parts = step_dir.relative_to(steps_base_dir).parts
except ValueError:
console.print(f"[red]Error:[/red] Invalid step id '{step_id}'")
raise typer.Exit(1)
if rel_parts != (step_id,):
console.print(f"[red]Error:[/red] Invalid step id '{step_id}'")
raise typer.Exit(1)
dir_exists = step_dir.exists()
if not in_registry and not dir_exists:
console.print(f"[red]Error:[/red] Step type '{step_id}' is not installed")
raise typer.Exit(1)
if not in_registry and dir_exists:
# The registry was likely reset due to corruption. Warn the user that the
# directory is being removed even though there is no registry entry, so
# the orphaned package can be cleaned up and a fresh install attempted.
console.print(
f"[yellow]Warning:[/yellow] '{step_id}' has no registry entry "
"(registry may have been reset). Removing the orphaned directory."
)
if dir_exists and not in_registry:
# No registry write needed; just delete the orphaned directory.
import shutil
try:
shutil.rmtree(step_dir)
except OSError as exc:
console.print(
f"[red]Error:[/red] Failed to remove step directory {step_dir}: {exc}"
)
raise typer.Exit(1)
elif in_registry:
# Remove the registry entry, then the directory. If the directory
# delete fails, restore the registry entry so state stays consistent
# and a future `step add` isn't blocked by an orphaned directory
# with no registry entry.
registry_metadata = registry.get(step_id)
try:
registry.remove(step_id)
except StepValidationError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(1)
if dir_exists:
import shutil
try:
shutil.rmtree(step_dir)
except OSError as exc:
# Restore the original registry entry verbatim (bypass add()
# which would overwrite timestamps).
try:
if registry_metadata is not None:
registry.data["steps"][step_id] = registry_metadata
registry.save()
except Exception as restore_exc: # noqa: BLE001
console.print(
f"[yellow]Warning:[/yellow] Failed to restore registry entry "
f"for '{step_id}' after directory removal failure: {restore_exc}"
)
console.print(
f"[red]Error:[/red] Failed to remove step directory {step_dir}: {exc}"
)
raise typer.Exit(1)
console.print(f"[green]✓[/green] Step type '{step_id}' uninstalled")
@workflow_step_app.command("search")
def workflow_step_search(
query: str | None = typer.Argument(None, help="Search query"),
):
"""Search the step type catalog."""
from .workflows.catalog import StepCatalog, StepCatalogError
project_root = _require_specify_project()
catalog = StepCatalog(project_root)
try:
results = catalog.search(query=query)
except StepCatalogError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(1)
if not results:
if query:
console.print(f"[yellow]No step types found matching '{query}'.[/yellow]")
else:
console.print("[yellow]No step types found in catalog.[/yellow]")
return
console.print(f"\n[bold cyan]Step Types ({len(results)}):[/bold cyan]\n")
for step in results:
install_note = (
"" if step.get("_install_allowed", True) else " [dim](discovery only)[/dim]"
)
console.print(
f" [bold]{step.get('name', step.get('id', '?'))}[/bold]"
f" ({step.get('id', '?')}) v{step.get('version', '?')}{install_note}"
)
desc = step.get("description", "")
if desc:
console.print(f" {desc}")
console.print()
@workflow_step_app.command("info")
def workflow_step_info(
step_id: str = typer.Argument(..., help="Step type ID"),
):
"""Show details for a step type."""
from .workflows import STEP_REGISTRY
from .workflows.catalog import StepCatalog, StepCatalogError, StepRegistry
project_root = _require_specify_project()
registry = StepRegistry(project_root)
installed_meta = registry.get(step_id)
# Check if it's a built-in
builtin_step = STEP_REGISTRY.get(step_id)
is_builtin = builtin_step is not None and not installed_meta
if is_builtin:
console.print(f"\n[bold cyan]{step_id}[/bold cyan] [dim](built-in)[/dim]")
console.print(f" Type key: {step_id}")
console.print(" [green]Built-in step type[/green]")
return
if installed_meta:
console.print(
f"\n[bold cyan]{installed_meta.get('name', step_id)}[/bold cyan] ({step_id})"
)
console.print(f" Version: {installed_meta.get('version', '?')}")
if installed_meta.get("author"):
console.print(f" Author: {installed_meta['author']}")
if installed_meta.get("description"):
console.print(f" Description: {installed_meta['description']}")
console.print(" [green]Installed[/green]")
return
# Try catalog
catalog = StepCatalog(project_root)
try:
info = catalog.get_step_info(step_id)
except StepCatalogError:
info = None
if info:
console.print(
f"\n[bold cyan]{info.get('name', step_id)}[/bold cyan] ({step_id})"
)
console.print(f" Version: {info.get('version', '?')}")
if info.get("author"):
console.print(f" Author: {info['author']}")
if info.get("description"):
console.print(f" Description: {info['description']}")
console.print(" [yellow]Not installed[/yellow]")
console.print(
f"\n Install with: [cyan]specify workflow step add {step_id}[/cyan]"
)
else:
console.print(f"[red]Error:[/red] Step type '{step_id}' not found")
raise typer.Exit(1)
@workflow_step_catalog_app.command("list")
def workflow_step_catalog_list():
"""List configured step catalog sources."""
from .workflows.catalog import StepCatalog, StepCatalogError
project_root = _require_specify_project()
catalog = StepCatalog(project_root)
try:
configs = catalog.get_catalog_configs()
except StepCatalogError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(1)
console.print("\n[bold cyan]Step Catalog Sources:[/bold cyan]\n")
for i, cfg in enumerate(configs):
install_status = (
"[green]install allowed[/green]"
if cfg["install_allowed"]
else "[yellow]discovery only[/yellow]"
)
console.print(f" [{i}] [bold]{cfg['name']}[/bold] — {install_status}")
console.print(f" {cfg['url']}")
if cfg.get("description"):
console.print(f" [dim]{cfg['description']}[/dim]")
console.print()
@workflow_step_catalog_app.command("add")
def workflow_step_catalog_add(
url: str = typer.Argument(..., help="Catalog URL to add"),
name: str = typer.Option(None, "--name", help="Catalog name"),
):
"""Add a step catalog source."""
from .workflows.catalog import StepCatalog, StepValidationError
project_root = _require_specify_project()
catalog = StepCatalog(project_root)
try:
catalog.add_catalog(url, name)
except StepValidationError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(1)
console.print(f"[green]✓[/green] Step catalog source added: {url}")
@workflow_step_catalog_app.command("remove")
def workflow_step_catalog_remove(
index: int = typer.Argument(
..., help="Catalog index to remove (from 'step catalog list')"
),
):
"""Remove a step catalog source by index."""
from .workflows.catalog import StepCatalog, StepValidationError
project_root = _require_specify_project()
catalog = StepCatalog(project_root)
try:
removed_name = catalog.remove_catalog(index)
except StepValidationError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(1)
console.print(f"[green]✓[/green] Step catalog source '{removed_name}' removed")
def main():
# On Windows the default stdout/stderr code page (e.g. cp1252) cannot encode
# the Rich banner and box-drawing glyphs, so the CLI crashes with

View File

@@ -7,10 +7,12 @@ Provides:
- ``STEP_REGISTRY`` — maps ``type_key`` to ``StepBase`` subclass instances.
- ``WorkflowEngine`` — orchestrator that loads, validates, and executes
workflow YAML definitions.
- ``load_custom_steps`` — loads community-installed step types into STEP_REGISTRY.
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@@ -66,3 +68,134 @@ def _register_builtin_steps() -> None:
_register_builtin_steps()
def load_custom_steps(project_root: Path) -> list[str]:
"""Load community-installed custom step types into STEP_REGISTRY.
Scans ``.specify/workflows/steps/`` for installed step packages.
Each valid package must contain ``step.yml`` (with a ``step.type_key``
field) and ``__init__.py`` (a ``StepBase`` subclass).
Returns a list of type_keys that were successfully loaded.
Silently skips packages that fail to import or validate.
"""
import hashlib as _hashlib
import importlib.util as _importlib_util
import re as _re
import sys as _sys
steps_dir = Path(project_root) / ".specify" / "workflows" / "steps"
# Defense-in-depth: refuse to execute step code from a symlinked
# parent directory under .specify/workflows/steps, which could redirect
# the import outside the project root and bypass the install-time
# symlink guard. Check symlinks *before* is_dir() since the latter
# follows symlinks and would stat an external target.
_current = Path(project_root)
for _part in (".specify", "workflows", "steps"):
_current = _current / _part
if _current.is_symlink():
return []
if not steps_dir.is_dir():
return []
loaded: list[str] = []
for step_dir in steps_dir.iterdir():
# Check symlinks before is_dir() since the latter follows symlinks
# and would stat an external target through a symlinked directory.
if step_dir.is_symlink():
continue
if not step_dir.is_dir():
continue
step_yml = step_dir / "step.yml"
init_py = step_dir / "__init__.py"
if step_yml.is_symlink() or init_py.is_symlink():
continue
if not step_yml.is_file() or not init_py.is_file():
continue
try:
import yaml as _yaml
meta = _yaml.safe_load(step_yml.read_text(encoding="utf-8")) or {}
step_meta = meta.get("step", {})
type_key = step_meta.get("type_key", "")
if not type_key:
continue
# Skip if already registered (e.g. built-in or previously loaded)
if type_key in STEP_REGISTRY:
continue
# Sanitize type_key so the synthetic module name is a valid identifier
# (e.g. "test-custom" → "_speckit_custom_step_test_custom_<hash>").
# The 8-char SHA-256 hash of the original type_key makes the name
# collision-resistant when different type_keys produce the same
# sanitized form (e.g. "a-b" and "a_b" both sanitize to "a_b" but
# have different hashes).
safe_key = _re.sub(r"[^A-Za-z0-9_]", "_", type_key)
key_hash = _hashlib.sha256(type_key.encode()).hexdigest()[:8]
module_name = f"_speckit_custom_step_{safe_key}_{key_hash}"
# Treat the step directory as a proper package so that relative
# imports inside the step (e.g. ``from .helpers import …``) work.
spec = _importlib_util.spec_from_file_location(
module_name,
init_py,
submodule_search_locations=[str(step_dir)],
)
if spec is None or spec.loader is None:
continue
module = _importlib_util.module_from_spec(spec)
module.__package__ = module_name
# Register before exec so relative imports resolve correctly.
_sys.modules[module_name] = module
registered = False
try:
spec.loader.exec_module(module) # type: ignore[union-attr]
# Find the StepBase subclass in the module
from .base import StepBase as _StepBase
step_class = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
try:
if (
isinstance(attr, type)
and issubclass(attr, _StepBase)
and attr is not _StepBase
and getattr(attr, "type_key", "") == type_key
):
step_class = attr
break
except TypeError:
continue
if step_class is None:
continue
_register_step(step_class())
loaded.append(type_key)
registered = True
finally:
# If the step wasn't successfully registered (failed import,
# no matching StepBase subclass, or registration error), remove
# the synthetic module — and any submodules loaded via relative
# imports (e.g. ``from .helpers import …``) — from sys.modules so
# a broken/skipped step package leaves no lingering import state
# behind.
if not registered:
_sys.modules.pop(module_name, None)
submodule_prefix = module_name + "."
for _mod_key in [
k for k in _sys.modules if k.startswith(submodule_prefix)
]:
_sys.modules.pop(_mod_key, None)
except Exception: # noqa: BLE001
# Silently skip broken step packages at load time
continue
return loaded

View File

@@ -1,9 +1,10 @@
"""Workflow catalog — discovery, install, and management of workflows.
"""Workflow catalog — discovery, install, and management of workflows and step types.
Mirrors the existing extension/preset catalog pattern with:
- Multi-catalog stack (env var → project → user → built-in)
- SHA256-hashed per-URL caching with 1-hour TTL
- Workflow registry for installed workflow tracking
- Step registry for installed custom step type tracking
- Search across all configured catalog sources
"""
@@ -165,7 +166,7 @@ class WorkflowCatalog:
f"Catalog URL must use HTTPS (got {parsed.scheme}://). "
"HTTP is only allowed for localhost."
)
if not parsed.netloc:
if not parsed.hostname:
raise WorkflowValidationError(
"Catalog URL must be a valid URL with a host."
)
@@ -181,6 +182,11 @@ class WorkflowCatalog:
except (yaml.YAMLError, OSError, UnicodeError) as exc:
raise WorkflowValidationError(
f"Failed to read catalog config {config_path}: {exc}"
) from exc
if not isinstance(data, dict):
raise WorkflowValidationError(
f"Invalid catalog config: expected a mapping, "
f"got {type(data).__name__}"
)
catalogs_data = data.get("catalogs", [])
if not catalogs_data:
@@ -302,9 +308,9 @@ class WorkflowCatalog:
try:
with open(meta_file, encoding="utf-8") as f:
meta = json.load(f)
fetched_at = meta.get("fetched_at", 0)
fetched_at = float(meta.get("fetched_at", 0))
return (time.time() - fetched_at) < self.CACHE_DURATION
except (json.JSONDecodeError, OSError):
except (json.JSONDecodeError, OSError, TypeError, ValueError):
return False
def _fetch_single_catalog(
@@ -318,6 +324,7 @@ class WorkflowCatalog:
with open(cache_file, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
# Ignore invalid/unreadable cache and fall back to fetching from source.
pass
# Fetch from URL — validate scheme before opening and after redirects
@@ -333,6 +340,10 @@ class WorkflowCatalog:
raise WorkflowCatalogError(
f"Refusing to fetch catalog from non-HTTPS URL: {url}"
)
if not parsed.hostname:
raise WorkflowCatalogError(
f"Refusing to fetch catalog from URL with no hostname: {url}"
)
_validate_catalog_url(entry.url)
@@ -347,6 +358,7 @@ class WorkflowCatalog:
with open(cache_file, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, ValueError, OSError):
# Stale-cache read failed; let the original fetch error propagate.
pass
raise WorkflowCatalogError(
f"Failed to fetch catalog from {entry.url}: {exc}"
@@ -358,11 +370,14 @@ class WorkflowCatalog:
)
# Write cache
self.cache_dir.mkdir(parents=True, exist_ok=True)
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
with open(meta_file, "w", encoding="utf-8") as f:
json.dump({"url": entry.url, "fetched_at": time.time()}, f)
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
with open(meta_file, "w", encoding="utf-8") as f:
json.dump({"url": entry.url, "fetched_at": time.time()}, f)
except OSError:
pass # Proceed without caching if disk write fails
return data
@@ -468,7 +483,14 @@ class WorkflowCatalog:
data: dict[str, Any] = {"catalogs": []}
if config_path.exists():
raw = yaml.safe_load(config_path.read_text(encoding="utf-8"))
try:
raw = yaml.safe_load(config_path.read_text(encoding="utf-8"))
except (yaml.YAMLError, OSError, UnicodeDecodeError) as exc:
raise WorkflowValidationError(
f"Catalog config file is unreadable or malformed: {exc}"
) from exc
if raw is None:
raw = {"catalogs": []}
if not isinstance(raw, dict):
raise WorkflowValidationError(
"Catalog config file is corrupted (expected a mapping)."
@@ -487,9 +509,21 @@ class WorkflowCatalog:
f"Catalog URL already configured: {url}"
)
# Derive priority from the highest existing priority + 1
# Derive priority from the highest existing priority + 1.
# Coerce existing priorities to int with a safe fallback so a user-edited
# workflow-catalogs.yml with a non-integer priority (e.g. "1") doesn't blow up.
def _coerce_priority(value: Any) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0
max_priority = max(
(cat.get("priority", 0) for cat in catalogs if isinstance(cat, dict)),
(
_coerce_priority(cat.get("priority", 0))
for cat in catalogs
if isinstance(cat, dict)
),
default=0,
)
catalogs.append(
@@ -503,9 +537,14 @@ class WorkflowCatalog:
)
data["catalogs"] = catalogs
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
except OSError as exc:
raise WorkflowValidationError(
f"Failed to write catalog config {config_path}: {exc}"
) from exc
def remove_catalog(self, index: int) -> str:
"""Remove a catalog source by index (0-based). Returns the removed name."""
@@ -513,7 +552,12 @@ class WorkflowCatalog:
if not config_path.exists():
raise WorkflowValidationError("No catalog config file found.")
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
try:
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeDecodeError) as exc:
raise WorkflowValidationError(
f"Catalog config file is unreadable or malformed: {exc}"
) from exc
if not isinstance(data, dict):
raise WorkflowValidationError(
"Catalog config file is corrupted (expected a mapping)."
@@ -532,8 +576,623 @@ class WorkflowCatalog:
removed = catalogs.pop(index)
data["catalogs"] = catalogs
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
try:
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
except OSError as exc:
raise WorkflowValidationError(
f"Failed to write catalog config {config_path}: {exc}"
) from exc
if isinstance(removed, dict):
return removed.get("name", f"catalog-{index + 1}")
return f"catalog-{index + 1}"
# ---------------------------------------------------------------------------
# Step catalog errors
# ---------------------------------------------------------------------------
class StepCatalogError(Exception):
"""Base error for step catalog operations."""
class StepValidationError(StepCatalogError):
"""Validation error for step catalog config or step data."""
# ---------------------------------------------------------------------------
# StepCatalogEntry
# ---------------------------------------------------------------------------
@dataclass
class StepCatalogEntry:
"""Represents a single step catalog source in the catalog stack."""
url: str
name: str
priority: int
install_allowed: bool
description: str = ""
# ---------------------------------------------------------------------------
# StepRegistry
# ---------------------------------------------------------------------------
class StepRegistry:
"""Manages the registry of installed custom step types.
Tracks installed step types and their metadata in
``.specify/workflows/steps/step-registry.json``.
"""
REGISTRY_FILE = "step-registry.json"
SCHEMA_VERSION = "1.0"
def __init__(self, project_root: Path) -> None:
self.project_root = project_root
self.steps_dir = project_root / ".specify" / "workflows" / "steps"
self.registry_path = self.steps_dir / self.REGISTRY_FILE
self.data = self._load()
def _has_symlinked_parent(self) -> bool:
"""Return True if any directory under .specify/workflows/steps is a symlink."""
current = self.project_root
for part in (".specify", "workflows", "steps"):
current = current / part
if current.is_symlink():
return True
return False
def _load(self) -> dict[str, Any]:
"""Load registry from disk or create default."""
default_registry: dict[str, Any] = {"schema_version": self.SCHEMA_VERSION, "steps": {}}
# Defense-in-depth: refuse to read the registry if any parent directory
# under .specify/workflows/steps is a symlink, which could redirect the
# read outside the project root.
if self._has_symlinked_parent():
return default_registry
# Defense-in-depth: also refuse to read a symlinked registry file,
# which could redirect the read outside the project root.
if self.registry_path.is_symlink():
return default_registry
if self.registry_path.exists():
try:
with open(self.registry_path, encoding="utf-8") as f:
data = json.load(f)
# Validate shape: must be a dict with a dict "steps" field
if not isinstance(data, dict):
return default_registry
if not isinstance(data.get("steps"), dict):
data["steps"] = {}
return data
except (json.JSONDecodeError, ValueError, OSError, UnicodeError):
return default_registry
return default_registry
def save(self) -> None:
"""Persist registry to disk.
Raises ``StepValidationError`` with a clear message on filesystem
errors (read-only fs, permission denied, ...) so callers can surface
a clean error to the user rather than an unhandled ``OSError``.
"""
if self._has_symlinked_parent() or self.registry_path.is_symlink():
raise StepValidationError(
"Refusing to write step registry through a symlinked path."
)
try:
self.steps_dir.mkdir(parents=True, exist_ok=True)
with open(self.registry_path, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2)
except OSError as exc:
raise StepValidationError(
f"Failed to write step registry at {self.registry_path}: {exc}"
) from exc
def add(self, step_id: str, metadata: dict[str, Any]) -> None:
"""Add or update an installed step entry."""
import copy
from datetime import datetime, timezone
existing = self.data["steps"].get(step_id, {})
metadata_to_store = copy.deepcopy(metadata)
metadata_to_store["installed_at"] = existing.get(
"installed_at", datetime.now(timezone.utc).isoformat()
)
metadata_to_store["updated_at"] = datetime.now(timezone.utc).isoformat()
self.data["steps"][step_id] = metadata_to_store
self.save()
def remove(self, step_id: str) -> bool:
"""Remove an installed step entry. Returns True if found."""
if step_id in self.data["steps"]:
del self.data["steps"][step_id]
self.save()
return True
return False
def get(self, step_id: str) -> dict[str, Any] | None:
"""Get metadata for an installed step."""
return self.data["steps"].get(step_id)
def list(self) -> dict[str, dict[str, Any]]:
"""Return all installed steps."""
return dict(self.data["steps"])
def is_installed(self, step_id: str) -> bool:
"""Check if a step is installed."""
return step_id in self.data["steps"]
# ---------------------------------------------------------------------------
# StepCatalog
# ---------------------------------------------------------------------------
class StepCatalog:
"""Manages step catalog fetching, caching, and searching.
Resolution order for catalog sources:
1. ``SPECKIT_STEP_CATALOG_URL`` env var (overrides all)
2. Project-level ``.specify/step-catalogs.yml``
3. User-level ``~/.specify/step-catalogs.yml``
4. Built-in defaults (official + community)
"""
DEFAULT_CATALOG_URL = (
"https://raw.githubusercontent.com/github/spec-kit/main/"
"workflows/step-catalog.json"
)
COMMUNITY_CATALOG_URL = (
"https://raw.githubusercontent.com/github/spec-kit/main/"
"workflows/step-catalog.community.json"
)
CACHE_DURATION = 3600 # 1 hour
def __init__(self, project_root: Path) -> None:
self.project_root = project_root
self.steps_dir = project_root / ".specify" / "workflows" / "steps"
self.cache_dir = self.steps_dir / ".cache"
def _is_cache_path_safe(self) -> bool:
"""Return False if any component of the cache path is a symlink."""
current = self.project_root
for part in (".specify", "workflows", "steps", ".cache"):
current = current / part
if current.is_symlink():
return False
return True
# -- Catalog resolution -----------------------------------------------
def _validate_catalog_url(self, url: str) -> None:
"""Validate that a catalog URL uses HTTPS (localhost HTTP allowed)."""
from urllib.parse import urlparse
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (
parsed.scheme == "http" and is_localhost
):
raise StepValidationError(
f"Catalog URL must use HTTPS (got {parsed.scheme}://). "
"HTTP is only allowed for localhost."
)
if not parsed.hostname:
raise StepValidationError(
"Catalog URL must be a valid URL with a host."
)
def _load_catalog_config(
self, config_path: Path
) -> list[StepCatalogEntry] | None:
"""Load catalog stack configuration from a YAML file."""
if not config_path.exists():
return None
try:
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeError) as exc:
raise StepValidationError(
f"Failed to read catalog config {config_path}: {exc}"
) from exc
if not isinstance(data, dict):
raise StepValidationError(
f"Invalid catalog config: expected a mapping, "
f"got {type(data).__name__}"
)
catalogs_data = data.get("catalogs", [])
if not catalogs_data:
return None
if not isinstance(catalogs_data, list):
raise StepValidationError(
f"Invalid catalog config: 'catalogs' must be a list, "
f"got {type(catalogs_data).__name__}"
)
entries: list[StepCatalogEntry] = []
for idx, item in enumerate(catalogs_data):
if not isinstance(item, dict):
raise StepValidationError(
f"Invalid catalog entry at index {idx}: "
f"expected a mapping, got {type(item).__name__}"
)
url = str(item.get("url", "")).strip()
if not url:
continue
self._validate_catalog_url(url)
try:
priority = int(item.get("priority", idx + 1))
except (TypeError, ValueError):
raise StepValidationError(
f"Invalid priority for catalog "
f"'{item.get('name', idx + 1)}': "
f"expected integer, got {item.get('priority')!r}"
)
raw_install = item.get("install_allowed", False)
if isinstance(raw_install, str):
install_allowed = raw_install.strip().lower() in (
"true",
"yes",
"1",
)
else:
install_allowed = bool(raw_install)
entries.append(
StepCatalogEntry(
url=url,
name=str(item.get("name", f"catalog-{idx + 1}")),
priority=priority,
install_allowed=install_allowed,
description=str(item.get("description", "")),
)
)
entries.sort(key=lambda e: e.priority)
if not entries:
raise StepValidationError(
f"Catalog config {config_path} contains {len(catalogs_data)} "
f"entries but none have valid URLs."
)
return entries
def get_active_catalogs(self) -> list[StepCatalogEntry]:
"""Get the ordered list of active step catalogs."""
# 1. Environment variable override
env_url = os.environ.get("SPECKIT_STEP_CATALOG_URL", "").strip()
if env_url:
self._validate_catalog_url(env_url)
return [
StepCatalogEntry(
url=env_url,
name="env-override",
priority=1,
install_allowed=True,
description="From SPECKIT_STEP_CATALOG_URL",
)
]
# 2. Project-level config
project_config = self.project_root / ".specify" / "step-catalogs.yml"
project_entries = self._load_catalog_config(project_config)
if project_entries is not None:
return project_entries
# 3. User-level config
home = Path.home()
user_config = home / ".specify" / "step-catalogs.yml"
user_entries = self._load_catalog_config(user_config)
if user_entries is not None:
return user_entries
# 4. Built-in defaults
return [
StepCatalogEntry(
url=self.DEFAULT_CATALOG_URL,
name="default",
priority=1,
install_allowed=True,
description="Official step types",
),
StepCatalogEntry(
url=self.COMMUNITY_CATALOG_URL,
name="community",
priority=2,
install_allowed=False,
description="Community-contributed step types (discovery only)",
),
]
# -- Caching ----------------------------------------------------------
def _get_cache_paths(self, url: str) -> tuple[Path, Path]:
"""Get cache file paths for a URL (hash-based)."""
url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
cache_file = self.cache_dir / f"step-catalog-{url_hash}.json"
meta_file = self.cache_dir / f"step-catalog-{url_hash}-meta.json"
return cache_file, meta_file
def _is_url_cache_valid(self, url: str) -> bool:
"""Check if cached data for a URL is still fresh."""
_, meta_file = self._get_cache_paths(url)
if not meta_file.exists():
return False
try:
with open(meta_file, encoding="utf-8") as f:
meta = json.load(f)
fetched_at = float(meta.get("fetched_at", 0))
return (time.time() - fetched_at) < self.CACHE_DURATION
except (json.JSONDecodeError, OSError, TypeError, ValueError):
return False
def _fetch_single_catalog(
self, entry: StepCatalogEntry, force_refresh: bool = False
) -> dict[str, Any]:
"""Fetch a single catalog, using cache when possible."""
cache_safe = self._is_cache_path_safe()
cache_file, meta_file = self._get_cache_paths(entry.url)
if cache_safe and not force_refresh and self._is_url_cache_valid(entry.url):
try:
with open(cache_file, encoding="utf-8") as f:
cached = json.load(f)
if isinstance(cached, dict):
return cached
except (json.JSONDecodeError, OSError):
# Ignore invalid/unreadable cache and fall back to fetching from source.
pass
from urllib.parse import urlparse
from specify_cli.authentication.http import open_url as _open_url
def _validate_url(url: str) -> None:
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (
parsed.scheme == "http" and is_localhost
):
raise StepCatalogError(
f"Refusing to fetch catalog from non-HTTPS URL: {url}"
)
if not parsed.hostname:
raise StepCatalogError(
f"Refusing to fetch catalog from URL with no hostname: {url}"
)
_validate_url(entry.url)
try:
with _open_url(entry.url, timeout=30) as resp:
_validate_url(resp.geturl())
data = json.loads(resp.read().decode("utf-8"))
except Exception as exc:
if cache_safe and cache_file.exists():
try:
with open(cache_file, encoding="utf-8") as f:
cached = json.load(f)
if isinstance(cached, dict):
return cached
except (json.JSONDecodeError, ValueError, OSError):
# Stale-cache read failed; let the original fetch error propagate.
pass
raise StepCatalogError(
f"Failed to fetch catalog from {entry.url}: {exc}"
) from exc
if not isinstance(data, dict):
raise StepCatalogError(
f"Catalog from {entry.url} is not a valid JSON object."
)
if cache_safe:
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
with open(meta_file, "w", encoding="utf-8") as f:
json.dump({"url": entry.url, "fetched_at": time.time()}, f)
except OSError:
pass # Proceed without caching if disk write fails
return data
def _get_merged_steps(
self, force_refresh: bool = False
) -> dict[str, dict[str, Any]]:
"""Merge steps from all active catalogs (lower priority number wins)."""
catalogs = self.get_active_catalogs()
merged: dict[str, dict[str, Any]] = {}
fetch_errors = 0
for entry in reversed(catalogs):
try:
data = self._fetch_single_catalog(entry, force_refresh)
except StepCatalogError:
fetch_errors += 1
continue
steps = data.get("steps", {})
if isinstance(steps, dict):
for step_id, step_data in steps.items():
if not isinstance(step_data, dict):
continue
step_data["_catalog_name"] = entry.name
step_data["_install_allowed"] = entry.install_allowed
merged[step_id] = step_data
elif isinstance(steps, list):
for step_data in steps:
if not isinstance(step_data, dict):
continue
raw_step_id = step_data.get("id")
if raw_step_id is None:
continue
step_id = str(raw_step_id).strip()
if step_id:
step_data["id"] = step_id
step_data["_catalog_name"] = entry.name
step_data["_install_allowed"] = entry.install_allowed
merged[step_id] = step_data
if fetch_errors == len(catalogs) and catalogs:
raise StepCatalogError("All configured step catalogs failed to fetch.")
return merged
# -- Public API -------------------------------------------------------
def search(
self,
query: str | None = None,
) -> list[dict[str, Any]]:
"""Search step types across all configured catalogs."""
merged = self._get_merged_steps()
results: list[dict[str, Any]] = []
for step_id, step_data in merged.items():
step_data.setdefault("id", step_id)
if query:
q = query.lower()
searchable = " ".join(
[
str(step_data.get("name") or ""),
str(step_data.get("description") or ""),
str(step_data.get("id") or ""),
]
).lower()
if q not in searchable:
continue
results.append(step_data)
return results
def get_step_info(self, step_id: str) -> dict[str, Any] | None:
"""Get details for a specific step from the catalog."""
merged = self._get_merged_steps()
step = merged.get(step_id)
if step:
step.setdefault("id", step_id)
return step
def get_catalog_configs(self) -> list[dict[str, Any]]:
"""Return current catalog configuration as a list of dicts."""
entries = self.get_active_catalogs()
return [
{
"name": e.name,
"url": e.url,
"priority": e.priority,
"install_allowed": e.install_allowed,
"description": e.description,
}
for e in entries
]
def add_catalog(self, url: str, name: str | None = None) -> None:
"""Add a catalog source to the project-level config."""
self._validate_catalog_url(url)
config_path = self.project_root / ".specify" / "step-catalogs.yml"
data: dict[str, Any] = {"catalogs": []}
if config_path.exists():
try:
raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeDecodeError) as exc:
raise StepValidationError(
f"Catalog config file is unreadable or malformed: {exc}"
) from exc
if not isinstance(raw, dict):
raise StepValidationError(
"Catalog config file is corrupted (expected a mapping)."
)
data = raw
catalogs = data.get("catalogs", [])
if not isinstance(catalogs, list):
raise StepValidationError(
"Catalog config 'catalogs' must be a list."
)
for cat in catalogs:
if isinstance(cat, dict) and cat.get("url") == url:
raise StepValidationError(
f"Catalog URL already configured: {url}"
)
# Coerce existing priorities to int with a safe fallback so a user-edited
# step-catalogs.yml with a non-integer priority (e.g. "1") doesn't blow up.
def _coerce_priority(value: Any) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0
max_priority = max(
(
_coerce_priority(cat.get("priority", 0))
for cat in catalogs
if isinstance(cat, dict)
),
default=0,
)
catalogs.append(
{
"name": name or f"catalog-{len(catalogs) + 1}",
"url": url,
"priority": max_priority + 1,
"install_allowed": True,
"description": "",
}
)
data["catalogs"] = catalogs
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(
data, f, default_flow_style=False, sort_keys=False, allow_unicode=True
)
except OSError as exc:
raise StepValidationError(
f"Failed to write catalog config {config_path}: {exc}"
) from exc
def remove_catalog(self, index: int) -> str:
"""Remove a catalog source by index (0-based). Returns the removed name."""
config_path = self.project_root / ".specify" / "step-catalogs.yml"
if not config_path.exists():
raise StepValidationError("No step catalog config file found.")
try:
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeDecodeError) as exc:
raise StepValidationError(
f"Catalog config file is unreadable or malformed: {exc}"
) from exc
if not isinstance(data, dict):
raise StepValidationError(
"Catalog config file is corrupted (expected a mapping)."
)
catalogs = data.get("catalogs", [])
if not isinstance(catalogs, list):
raise StepValidationError(
"Catalog config 'catalogs' must be a list."
)
if index < 0 or index >= len(catalogs):
raise StepValidationError(
f"Catalog index {index} out of range (0-{len(catalogs) - 1})."
)
removed = catalogs.pop(index)
data["catalogs"] = catalogs
try:
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(
data, f, default_flow_style=False, sort_keys=False, allow_unicode=True
)
except OSError as exc:
raise StepValidationError(
f"Failed to write catalog config {config_path}: {exc}"
) from exc
if isinstance(removed, dict):
return removed.get("name", f"catalog-{index + 1}")