Files
github-spec-kit/src/specify_cli/workflows/catalog.py
Copilot c52ccd7dc7 Add workflow step catalog — community-installable step types (#2394)
* Initial plan

* Add workflow step catalog: StepRegistry, StepCatalog, CLI commands, and tests

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/2885e646-477d-4df8-b9a3-06d8cb29e748

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Potential fix for pull request finding 'An assert statement has a side-effect'

Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>

* Address PR review: path traversal, cache robustness, collision check, failed-to-load display

- Add resolve()+relative_to() path traversal guards in workflow_step_add and
  workflow_step_remove to prevent directory escape via step_id
- Harden _is_url_cache_valid in both StepCatalog and WorkflowCatalog to
  coerce fetched_at to float and catch TypeError/ValueError
- Check STEP_REGISTRY and StepRegistry before installing to prevent
  collisions with built-in step types or already-installed steps
- Show 'Custom (installed, failed to load)' section in workflow step list
  for steps in the registry that failed to load into STEP_REGISTRY

* Fix StepRegistry shape validation and StepCatalog empty-YAML handling

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/0dca6393-f5a9-40de-bb5c-77ba6af033d2

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Polish: rename _default to default_registry, strengthen unreadable-file test

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/0dca6393-f5a9-40de-bb5c-77ba6af033d2

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Address PR review: atomic install, hostname validation, cache resilience, no dynamic imports in list/info

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/3e18fef0-e2e6-4b3e-9e8d-9adb1e5e464e

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Fix shutil.move with existing step_dir: remove before move to avoid subdirectory nesting

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/3e18fef0-e2e6-4b3e-9e8d-9adb1e5e464e

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Call load_custom_steps at execution time; enforce hostname in _safe_fetch and _validate_url

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/73865880-fb25-4061-a43e-4e4b4d1c4de6

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Wrap YAML parsing in try/except; atomic step install via os.rename() under same fs

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/ff915bc5-ec7e-4e6a-b505-35f5795250df

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Validate YAML root is a dict in _load_catalog_config and workflow_step_add; fix WorkflowCatalog hostname validation

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Fix load_custom_steps() package imports and add reserved step ID validation

* Move _re/_sys imports out of loop and _RESERVED_STEP_IDS to module level

* Address review: collision-resistant module names, extra_files support, remove orphan dir

* Harden extra_files: warn on non-dict, resolve symlinks in path traversal check

* Switch _safe_fetch and StepCatalog._fetch_single_catalog to use open_url for auth consistency

* Harden step_id validation against path-segment tricks; raise on StepRegistry.save() OSError

* Clean up sys.modules on broken step packages; handle StepValidationError in step add/remove

* Address review thread: int-coerce priorities, sys.modules cleanup, _require_specify_project, registry-first remove

* fix: normalize workflow step catalog metadata fallbacks

* fix: address latest workflow step and catalog review findings

* Handle non-string extra_files keys in workflow step add

* Harden StepRegistry symlink reads and extra_files path/URL validation

* Harden custom step loader and step remove against symlinks and OSError

* Fix StepCatalog.search() to coerce non-string fields before joining

* Fix WorkflowCatalog YAML parsing error handling and isinstance checks

* Harden step registry save and custom step/catalog ID handling

* Harden cache validation and staging OSError handling

* Address review: reorder symlink guard and split mixed test

- Move symlink-parent check before is_dir() in load_custom_steps() so
  we never stat an external target through a symlink
- Split test_get_merged_steps_normalizes_list_ids_to_strings into two
  focused tests: one for list-id normalization, one for get_step_info
  return values

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review: symlink-before-stat in loader, restore registry on rmtree failure

- load_custom_steps(): check is_symlink() before is_dir() on step
  directories so symlinked entries are skipped without statting external
  targets
- workflow_step_remove: restore the registry entry when shutil.rmtree()
  fails so filesystem and registry state stay consistent and a future
  'step add' isn't blocked

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Harden step_id validation and file-write error handling

- _validate_step_id_or_exit: reject whitespace-only/padded IDs,
  Windows-invalid characters (<>:"|?*), control characters, trailing
  dots/spaces, and Windows reserved device names (con, nul, etc.)
- Wrap step.yml/__init__.py staging writes in OSError handler
- Wrap extra_files disk writes (mkdir + write_bytes) in OSError handler
  that names the failing relative path
- Registry rollback on rmtree failure: restore verbatim metadata and
  emit a warning if the restore itself fails

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review: cache symlink guard, verbatim registry rollback, Windows test fix

- StepCatalog: add _is_cache_path_safe() guard that checks for symlinks
  in .specify/workflows/steps/.cache path; skip cache reads and writes
  when any component is symlinked to prevent writes outside project root
- Registry rollback: write metadata directly to registry.data['steps']
  and call save() instead of using add() which overwrites timestamps
- temp_dir fixture: use ignore_errors=True on Windows to avoid flaky
  teardown from locked file handles (WinError 32)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Simplify exec_module call by removing redundant nested try/except

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix empty YAML tolerance in WorkflowCatalog.add_catalog, scope ignore_errors to Windows

- WorkflowCatalog.add_catalog(): treat None from yaml.safe_load() (empty
  file) as an empty mapping instead of raising 'corrupted'
- temp_dir fixture: limit ignore_errors to sys.platform == 'win32' so
  real cleanup issues surface on non-Windows platforms

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Chain exceptions in _load_catalog_config for both catalog classes

Add 'from exc' to preserve root cause in tracebacks while keeping
clean user-facing messages.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Make default catalog tests hermetic by isolating HOME

Monkeypatch Path.home() to project_dir and clear catalog env vars so
tests don't break on machines with a real ~/.specify/step-catalogs.yml
or ~/.specify/workflow-catalogs.yml.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix falsy ID handling in _get_merged_steps for list-based catalogs

Check for None explicitly instead of using 'or' which drops valid
falsy IDs like 0.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Compare reserved step IDs case-insensitively for filesystem safety

On case-insensitive filesystems (Windows, common macOS), variants like
STEP-REGISTRY.JSON would collide with the actual registry file.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add explanatory comments to intentional empty except blocks

Document why cache-read failures are silently ignored in both
WorkflowCatalog and StepCatalog _fetch_single_catalog methods.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: Manfred Riem <mnriem@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-16 18:03:45 -05:00

1200 lines
45 KiB
Python

"""Workflow catalog — discovery, install, and management of workflows and step types.
Mirrors the existing extension/preset catalog pattern with:
- Multi-catalog stack (env var → project → user → built-in)
- SHA256-hashed per-URL caching with 1-hour TTL
- Workflow registry for installed workflow tracking
- Step registry for installed custom step type tracking
- Search across all configured catalog sources
"""
from __future__ import annotations
import hashlib
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
# ---------------------------------------------------------------------------
# Errors
# ---------------------------------------------------------------------------
class WorkflowCatalogError(Exception):
"""Base error for workflow catalog operations."""
class WorkflowValidationError(WorkflowCatalogError):
"""Validation error for catalog config or workflow data."""
# ---------------------------------------------------------------------------
# CatalogEntry
# ---------------------------------------------------------------------------
@dataclass
class WorkflowCatalogEntry:
"""Represents a single catalog source in the catalog stack."""
url: str
name: str
priority: int
install_allowed: bool
description: str = ""
# ---------------------------------------------------------------------------
# WorkflowRegistry
# ---------------------------------------------------------------------------
class WorkflowRegistry:
"""Manages the registry of installed workflows.
Tracks installed workflows and their metadata in
``.specify/workflows/workflow-registry.json``.
"""
REGISTRY_FILE = "workflow-registry.json"
SCHEMA_VERSION = "1.0"
def __init__(self, project_root: Path) -> None:
self.project_root = project_root
self.workflows_dir = project_root / ".specify" / "workflows"
self.registry_path = self.workflows_dir / self.REGISTRY_FILE
self.data = self._load()
def _load(self) -> dict[str, Any]:
"""Load registry from disk or create default."""
if self.registry_path.exists():
try:
with open(self.registry_path, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, ValueError):
# Corrupted registry file — reset to default
return {"schema_version": self.SCHEMA_VERSION, "workflows": {}}
return {"schema_version": self.SCHEMA_VERSION, "workflows": {}}
def save(self) -> None:
"""Persist registry to disk."""
self.workflows_dir.mkdir(parents=True, exist_ok=True)
with open(self.registry_path, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2)
def add(self, workflow_id: str, metadata: dict[str, Any]) -> None:
"""Add or update an installed workflow entry."""
from datetime import datetime, timezone
existing = self.data["workflows"].get(workflow_id, {})
metadata["installed_at"] = existing.get(
"installed_at", datetime.now(timezone.utc).isoformat()
)
metadata["updated_at"] = datetime.now(timezone.utc).isoformat()
self.data["workflows"][workflow_id] = metadata
self.save()
def remove(self, workflow_id: str) -> bool:
"""Remove an installed workflow entry. Returns True if found."""
if workflow_id in self.data["workflows"]:
del self.data["workflows"][workflow_id]
self.save()
return True
return False
def get(self, workflow_id: str) -> dict[str, Any] | None:
"""Get metadata for an installed workflow."""
return self.data["workflows"].get(workflow_id)
def list(self) -> dict[str, dict[str, Any]]:
"""Return all installed workflows."""
return dict(self.data["workflows"])
def is_installed(self, workflow_id: str) -> bool:
"""Check if a workflow is installed."""
return workflow_id in self.data["workflows"]
# ---------------------------------------------------------------------------
# WorkflowCatalog
# ---------------------------------------------------------------------------
class WorkflowCatalog:
"""Manages workflow catalog fetching, caching, and searching.
Resolution order for catalog sources:
1. ``SPECKIT_WORKFLOW_CATALOG_URL`` env var (overrides all)
2. Project-level ``.specify/workflow-catalogs.yml``
3. User-level ``~/.specify/workflow-catalogs.yml``
4. Built-in defaults (official + community)
"""
DEFAULT_CATALOG_URL = (
"https://raw.githubusercontent.com/github/spec-kit/main/"
"workflows/catalog.json"
)
COMMUNITY_CATALOG_URL = (
"https://raw.githubusercontent.com/github/spec-kit/main/"
"workflows/catalog.community.json"
)
CACHE_DURATION = 3600 # 1 hour
def __init__(self, project_root: Path) -> None:
self.project_root = project_root
self.workflows_dir = project_root / ".specify" / "workflows"
self.cache_dir = self.workflows_dir / ".cache"
# -- Catalog resolution -----------------------------------------------
def _validate_catalog_url(self, url: str) -> None:
"""Validate that a catalog URL uses HTTPS (localhost HTTP allowed)."""
from urllib.parse import urlparse
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (
parsed.scheme == "http" and is_localhost
):
raise WorkflowValidationError(
f"Catalog URL must use HTTPS (got {parsed.scheme}://). "
"HTTP is only allowed for localhost."
)
if not parsed.hostname:
raise WorkflowValidationError(
"Catalog URL must be a valid URL with a host."
)
def _load_catalog_config(
self, config_path: Path
) -> list[WorkflowCatalogEntry] | None:
"""Load catalog stack configuration from a YAML file."""
if not config_path.exists():
return None
try:
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeError) as exc:
raise WorkflowValidationError(
f"Failed to read catalog config {config_path}: {exc}"
) from exc
if not isinstance(data, dict):
raise WorkflowValidationError(
f"Invalid catalog config: expected a mapping, "
f"got {type(data).__name__}"
)
catalogs_data = data.get("catalogs", [])
if not catalogs_data:
# Empty catalogs list (e.g. after removing last entry)
# is valid — fall back to built-in defaults.
return None
if not isinstance(catalogs_data, list):
raise WorkflowValidationError(
f"Invalid catalog config: 'catalogs' must be a list, "
f"got {type(catalogs_data).__name__}"
)
entries: list[WorkflowCatalogEntry] = []
for idx, item in enumerate(catalogs_data):
if not isinstance(item, dict):
raise WorkflowValidationError(
f"Invalid catalog entry at index {idx}: "
f"expected a mapping, got {type(item).__name__}"
)
url = str(item.get("url", "")).strip()
if not url:
continue
self._validate_catalog_url(url)
try:
priority = int(item.get("priority", idx + 1))
except (TypeError, ValueError):
raise WorkflowValidationError(
f"Invalid priority for catalog "
f"'{item.get('name', idx + 1)}': "
f"expected integer, got {item.get('priority')!r}"
)
raw_install = item.get("install_allowed", False)
if isinstance(raw_install, str):
install_allowed = raw_install.strip().lower() in (
"true",
"yes",
"1",
)
else:
install_allowed = bool(raw_install)
entries.append(
WorkflowCatalogEntry(
url=url,
name=str(item.get("name", f"catalog-{idx + 1}")),
priority=priority,
install_allowed=install_allowed,
description=str(item.get("description", "")),
)
)
entries.sort(key=lambda e: e.priority)
if not entries:
raise WorkflowValidationError(
f"Catalog config {config_path} contains {len(catalogs_data)} "
f"entries but none have valid URLs."
)
return entries
def get_active_catalogs(self) -> list[WorkflowCatalogEntry]:
"""Get the ordered list of active catalogs."""
# 1. Environment variable override
env_url = os.environ.get("SPECKIT_WORKFLOW_CATALOG_URL", "").strip()
if env_url:
self._validate_catalog_url(env_url)
return [
WorkflowCatalogEntry(
url=env_url,
name="env-override",
priority=1,
install_allowed=True,
description="From SPECKIT_WORKFLOW_CATALOG_URL",
)
]
# 2. Project-level config
project_config = self.project_root / ".specify" / "workflow-catalogs.yml"
project_entries = self._load_catalog_config(project_config)
if project_entries is not None:
return project_entries
# 3. User-level config
home = Path.home()
user_config = home / ".specify" / "workflow-catalogs.yml"
user_entries = self._load_catalog_config(user_config)
if user_entries is not None:
return user_entries
# 4. Built-in defaults
return [
WorkflowCatalogEntry(
url=self.DEFAULT_CATALOG_URL,
name="default",
priority=1,
install_allowed=True,
description="Official workflows",
),
WorkflowCatalogEntry(
url=self.COMMUNITY_CATALOG_URL,
name="community",
priority=2,
install_allowed=False,
description="Community-contributed workflows (discovery only)",
),
]
# -- Caching ----------------------------------------------------------
def _get_cache_paths(self, url: str) -> tuple[Path, Path]:
"""Get cache file paths for a URL (hash-based)."""
url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
cache_file = self.cache_dir / f"workflow-catalog-{url_hash}.json"
meta_file = self.cache_dir / f"workflow-catalog-{url_hash}-meta.json"
return cache_file, meta_file
def _is_url_cache_valid(self, url: str) -> bool:
"""Check if cached data for a URL is still fresh."""
_, meta_file = self._get_cache_paths(url)
if not meta_file.exists():
return False
try:
with open(meta_file, encoding="utf-8") as f:
meta = json.load(f)
fetched_at = float(meta.get("fetched_at", 0))
return (time.time() - fetched_at) < self.CACHE_DURATION
except (json.JSONDecodeError, OSError, TypeError, ValueError):
return False
def _fetch_single_catalog(
self, entry: WorkflowCatalogEntry, force_refresh: bool = False
) -> dict[str, Any]:
"""Fetch a single catalog, using cache when possible."""
cache_file, meta_file = self._get_cache_paths(entry.url)
if not force_refresh and self._is_url_cache_valid(entry.url):
try:
with open(cache_file, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
# Ignore invalid/unreadable cache and fall back to fetching from source.
pass
# Fetch from URL — validate scheme before opening and after redirects
from urllib.parse import urlparse
from specify_cli.authentication.http import open_url as _open_url
def _validate_catalog_url(url: str) -> None:
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (
parsed.scheme == "http" and is_localhost
):
raise WorkflowCatalogError(
f"Refusing to fetch catalog from non-HTTPS URL: {url}"
)
if not parsed.hostname:
raise WorkflowCatalogError(
f"Refusing to fetch catalog from URL with no hostname: {url}"
)
_validate_catalog_url(entry.url)
try:
with _open_url(entry.url, timeout=30) as resp:
_validate_catalog_url(resp.geturl())
data = json.loads(resp.read().decode("utf-8"))
except Exception as exc:
# Fall back to cache if available
if cache_file.exists():
try:
with open(cache_file, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, ValueError, OSError):
# Stale-cache read failed; let the original fetch error propagate.
pass
raise WorkflowCatalogError(
f"Failed to fetch catalog from {entry.url}: {exc}"
) from exc
if not isinstance(data, dict):
raise WorkflowCatalogError(
f"Catalog from {entry.url} is not a valid JSON object."
)
# Write cache
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
with open(meta_file, "w", encoding="utf-8") as f:
json.dump({"url": entry.url, "fetched_at": time.time()}, f)
except OSError:
pass # Proceed without caching if disk write fails
return data
def _get_merged_workflows(
self, force_refresh: bool = False
) -> dict[str, dict[str, Any]]:
"""Merge workflows from all active catalogs (lower priority number wins)."""
catalogs = self.get_active_catalogs()
merged: dict[str, dict[str, Any]] = {}
fetch_errors = 0
# Process later/higher-numbered entries first so earlier/lower-numbered
# entries overwrite them on workflow ID conflicts.
for entry in reversed(catalogs):
try:
data = self._fetch_single_catalog(entry, force_refresh)
except WorkflowCatalogError:
fetch_errors += 1
continue
workflows = data.get("workflows", {})
# Handle both dict and list formats
if isinstance(workflows, dict):
for wf_id, wf_data in workflows.items():
if not isinstance(wf_data, dict):
continue
wf_data["_catalog_name"] = entry.name
wf_data["_install_allowed"] = entry.install_allowed
merged[wf_id] = wf_data
elif isinstance(workflows, list):
for wf_data in workflows:
if not isinstance(wf_data, dict):
continue
wf_id = wf_data.get("id", "")
if wf_id:
wf_data["_catalog_name"] = entry.name
wf_data["_install_allowed"] = entry.install_allowed
merged[wf_id] = wf_data
if fetch_errors == len(catalogs) and catalogs:
raise WorkflowCatalogError(
"All configured catalogs failed to fetch."
)
return merged
# -- Public API -------------------------------------------------------
def search(
self,
query: str | None = None,
tag: str | None = None,
) -> list[dict[str, Any]]:
"""Search workflows across all configured catalogs."""
merged = self._get_merged_workflows()
results: list[dict[str, Any]] = []
for wf_id, wf_data in merged.items():
wf_data.setdefault("id", wf_id)
if query:
q = query.lower()
searchable = " ".join(
[
wf_data.get("name", ""),
wf_data.get("description", ""),
wf_data.get("id", ""),
]
).lower()
if q not in searchable:
continue
if tag:
raw_tags = wf_data.get("tags", [])
tags = raw_tags if isinstance(raw_tags, list) else []
normalized_tags = [t.lower() for t in tags if isinstance(t, str)]
if tag.lower() not in normalized_tags:
continue
results.append(wf_data)
return results
def get_workflow_info(self, workflow_id: str) -> dict[str, Any] | None:
"""Get details for a specific workflow from the catalog."""
merged = self._get_merged_workflows()
wf = merged.get(workflow_id)
if wf:
wf.setdefault("id", workflow_id)
return wf
def get_catalog_configs(self) -> list[dict[str, Any]]:
"""Return current catalog configuration as a list of dicts."""
entries = self.get_active_catalogs()
return [
{
"name": e.name,
"url": e.url,
"priority": e.priority,
"install_allowed": e.install_allowed,
"description": e.description,
}
for e in entries
]
def add_catalog(self, url: str, name: str | None = None) -> None:
"""Add a catalog source to the project-level config."""
self._validate_catalog_url(url)
config_path = self.project_root / ".specify" / "workflow-catalogs.yml"
data: dict[str, Any] = {"catalogs": []}
if config_path.exists():
try:
raw = yaml.safe_load(config_path.read_text(encoding="utf-8"))
except (yaml.YAMLError, OSError, UnicodeDecodeError) as exc:
raise WorkflowValidationError(
f"Catalog config file is unreadable or malformed: {exc}"
) from exc
if raw is None:
raw = {"catalogs": []}
if not isinstance(raw, dict):
raise WorkflowValidationError(
"Catalog config file is corrupted (expected a mapping)."
)
data = raw
catalogs = data.get("catalogs", [])
if not isinstance(catalogs, list):
raise WorkflowValidationError(
"Catalog config 'catalogs' must be a list."
)
# Check for duplicate URL (guard against non-dict entries)
for cat in catalogs:
if isinstance(cat, dict) and cat.get("url") == url:
raise WorkflowValidationError(
f"Catalog URL already configured: {url}"
)
# Derive priority from the highest existing priority + 1.
# Coerce existing priorities to int with a safe fallback so a user-edited
# workflow-catalogs.yml with a non-integer priority (e.g. "1") doesn't blow up.
def _coerce_priority(value: Any) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0
max_priority = max(
(
_coerce_priority(cat.get("priority", 0))
for cat in catalogs
if isinstance(cat, dict)
),
default=0,
)
catalogs.append(
{
"name": name or f"catalog-{len(catalogs) + 1}",
"url": url,
"priority": max_priority + 1,
"install_allowed": True,
"description": "",
}
)
data["catalogs"] = catalogs
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
except OSError as exc:
raise WorkflowValidationError(
f"Failed to write catalog config {config_path}: {exc}"
) from exc
def remove_catalog(self, index: int) -> str:
"""Remove a catalog source by index (0-based). Returns the removed name."""
config_path = self.project_root / ".specify" / "workflow-catalogs.yml"
if not config_path.exists():
raise WorkflowValidationError("No catalog config file found.")
try:
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeDecodeError) as exc:
raise WorkflowValidationError(
f"Catalog config file is unreadable or malformed: {exc}"
) from exc
if not isinstance(data, dict):
raise WorkflowValidationError(
"Catalog config file is corrupted (expected a mapping)."
)
catalogs = data.get("catalogs", [])
if not isinstance(catalogs, list):
raise WorkflowValidationError(
"Catalog config 'catalogs' must be a list."
)
if index < 0 or index >= len(catalogs):
raise WorkflowValidationError(
f"Catalog index {index} out of range (0-{len(catalogs) - 1})."
)
removed = catalogs.pop(index)
data["catalogs"] = catalogs
try:
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
except OSError as exc:
raise WorkflowValidationError(
f"Failed to write catalog config {config_path}: {exc}"
) from exc
if isinstance(removed, dict):
return removed.get("name", f"catalog-{index + 1}")
return f"catalog-{index + 1}"
# ---------------------------------------------------------------------------
# Step catalog errors
# ---------------------------------------------------------------------------
class StepCatalogError(Exception):
"""Base error for step catalog operations."""
class StepValidationError(StepCatalogError):
"""Validation error for step catalog config or step data."""
# ---------------------------------------------------------------------------
# StepCatalogEntry
# ---------------------------------------------------------------------------
@dataclass
class StepCatalogEntry:
"""Represents a single step catalog source in the catalog stack."""
url: str
name: str
priority: int
install_allowed: bool
description: str = ""
# ---------------------------------------------------------------------------
# StepRegistry
# ---------------------------------------------------------------------------
class StepRegistry:
"""Manages the registry of installed custom step types.
Tracks installed step types and their metadata in
``.specify/workflows/steps/step-registry.json``.
"""
REGISTRY_FILE = "step-registry.json"
SCHEMA_VERSION = "1.0"
def __init__(self, project_root: Path) -> None:
self.project_root = project_root
self.steps_dir = project_root / ".specify" / "workflows" / "steps"
self.registry_path = self.steps_dir / self.REGISTRY_FILE
self.data = self._load()
def _has_symlinked_parent(self) -> bool:
"""Return True if any directory under .specify/workflows/steps is a symlink."""
current = self.project_root
for part in (".specify", "workflows", "steps"):
current = current / part
if current.is_symlink():
return True
return False
def _load(self) -> dict[str, Any]:
"""Load registry from disk or create default."""
default_registry: dict[str, Any] = {"schema_version": self.SCHEMA_VERSION, "steps": {}}
# Defense-in-depth: refuse to read the registry if any parent directory
# under .specify/workflows/steps is a symlink, which could redirect the
# read outside the project root.
if self._has_symlinked_parent():
return default_registry
# Defense-in-depth: also refuse to read a symlinked registry file,
# which could redirect the read outside the project root.
if self.registry_path.is_symlink():
return default_registry
if self.registry_path.exists():
try:
with open(self.registry_path, encoding="utf-8") as f:
data = json.load(f)
# Validate shape: must be a dict with a dict "steps" field
if not isinstance(data, dict):
return default_registry
if not isinstance(data.get("steps"), dict):
data["steps"] = {}
return data
except (json.JSONDecodeError, ValueError, OSError, UnicodeError):
return default_registry
return default_registry
def save(self) -> None:
"""Persist registry to disk.
Raises ``StepValidationError`` with a clear message on filesystem
errors (read-only fs, permission denied, ...) so callers can surface
a clean error to the user rather than an unhandled ``OSError``.
"""
if self._has_symlinked_parent() or self.registry_path.is_symlink():
raise StepValidationError(
"Refusing to write step registry through a symlinked path."
)
try:
self.steps_dir.mkdir(parents=True, exist_ok=True)
with open(self.registry_path, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2)
except OSError as exc:
raise StepValidationError(
f"Failed to write step registry at {self.registry_path}: {exc}"
) from exc
def add(self, step_id: str, metadata: dict[str, Any]) -> None:
"""Add or update an installed step entry."""
import copy
from datetime import datetime, timezone
existing = self.data["steps"].get(step_id, {})
metadata_to_store = copy.deepcopy(metadata)
metadata_to_store["installed_at"] = existing.get(
"installed_at", datetime.now(timezone.utc).isoformat()
)
metadata_to_store["updated_at"] = datetime.now(timezone.utc).isoformat()
self.data["steps"][step_id] = metadata_to_store
self.save()
def remove(self, step_id: str) -> bool:
"""Remove an installed step entry. Returns True if found."""
if step_id in self.data["steps"]:
del self.data["steps"][step_id]
self.save()
return True
return False
def get(self, step_id: str) -> dict[str, Any] | None:
"""Get metadata for an installed step."""
return self.data["steps"].get(step_id)
def list(self) -> dict[str, dict[str, Any]]:
"""Return all installed steps."""
return dict(self.data["steps"])
def is_installed(self, step_id: str) -> bool:
"""Check if a step is installed."""
return step_id in self.data["steps"]
# ---------------------------------------------------------------------------
# StepCatalog
# ---------------------------------------------------------------------------
class StepCatalog:
"""Manages step catalog fetching, caching, and searching.
Resolution order for catalog sources:
1. ``SPECKIT_STEP_CATALOG_URL`` env var (overrides all)
2. Project-level ``.specify/step-catalogs.yml``
3. User-level ``~/.specify/step-catalogs.yml``
4. Built-in defaults (official + community)
"""
DEFAULT_CATALOG_URL = (
"https://raw.githubusercontent.com/github/spec-kit/main/"
"workflows/step-catalog.json"
)
COMMUNITY_CATALOG_URL = (
"https://raw.githubusercontent.com/github/spec-kit/main/"
"workflows/step-catalog.community.json"
)
CACHE_DURATION = 3600 # 1 hour
def __init__(self, project_root: Path) -> None:
self.project_root = project_root
self.steps_dir = project_root / ".specify" / "workflows" / "steps"
self.cache_dir = self.steps_dir / ".cache"
def _is_cache_path_safe(self) -> bool:
"""Return False if any component of the cache path is a symlink."""
current = self.project_root
for part in (".specify", "workflows", "steps", ".cache"):
current = current / part
if current.is_symlink():
return False
return True
# -- Catalog resolution -----------------------------------------------
def _validate_catalog_url(self, url: str) -> None:
"""Validate that a catalog URL uses HTTPS (localhost HTTP allowed)."""
from urllib.parse import urlparse
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (
parsed.scheme == "http" and is_localhost
):
raise StepValidationError(
f"Catalog URL must use HTTPS (got {parsed.scheme}://). "
"HTTP is only allowed for localhost."
)
if not parsed.hostname:
raise StepValidationError(
"Catalog URL must be a valid URL with a host."
)
def _load_catalog_config(
self, config_path: Path
) -> list[StepCatalogEntry] | None:
"""Load catalog stack configuration from a YAML file."""
if not config_path.exists():
return None
try:
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeError) as exc:
raise StepValidationError(
f"Failed to read catalog config {config_path}: {exc}"
) from exc
if not isinstance(data, dict):
raise StepValidationError(
f"Invalid catalog config: expected a mapping, "
f"got {type(data).__name__}"
)
catalogs_data = data.get("catalogs", [])
if not catalogs_data:
return None
if not isinstance(catalogs_data, list):
raise StepValidationError(
f"Invalid catalog config: 'catalogs' must be a list, "
f"got {type(catalogs_data).__name__}"
)
entries: list[StepCatalogEntry] = []
for idx, item in enumerate(catalogs_data):
if not isinstance(item, dict):
raise StepValidationError(
f"Invalid catalog entry at index {idx}: "
f"expected a mapping, got {type(item).__name__}"
)
url = str(item.get("url", "")).strip()
if not url:
continue
self._validate_catalog_url(url)
try:
priority = int(item.get("priority", idx + 1))
except (TypeError, ValueError):
raise StepValidationError(
f"Invalid priority for catalog "
f"'{item.get('name', idx + 1)}': "
f"expected integer, got {item.get('priority')!r}"
)
raw_install = item.get("install_allowed", False)
if isinstance(raw_install, str):
install_allowed = raw_install.strip().lower() in (
"true",
"yes",
"1",
)
else:
install_allowed = bool(raw_install)
entries.append(
StepCatalogEntry(
url=url,
name=str(item.get("name", f"catalog-{idx + 1}")),
priority=priority,
install_allowed=install_allowed,
description=str(item.get("description", "")),
)
)
entries.sort(key=lambda e: e.priority)
if not entries:
raise StepValidationError(
f"Catalog config {config_path} contains {len(catalogs_data)} "
f"entries but none have valid URLs."
)
return entries
def get_active_catalogs(self) -> list[StepCatalogEntry]:
"""Get the ordered list of active step catalogs."""
# 1. Environment variable override
env_url = os.environ.get("SPECKIT_STEP_CATALOG_URL", "").strip()
if env_url:
self._validate_catalog_url(env_url)
return [
StepCatalogEntry(
url=env_url,
name="env-override",
priority=1,
install_allowed=True,
description="From SPECKIT_STEP_CATALOG_URL",
)
]
# 2. Project-level config
project_config = self.project_root / ".specify" / "step-catalogs.yml"
project_entries = self._load_catalog_config(project_config)
if project_entries is not None:
return project_entries
# 3. User-level config
home = Path.home()
user_config = home / ".specify" / "step-catalogs.yml"
user_entries = self._load_catalog_config(user_config)
if user_entries is not None:
return user_entries
# 4. Built-in defaults
return [
StepCatalogEntry(
url=self.DEFAULT_CATALOG_URL,
name="default",
priority=1,
install_allowed=True,
description="Official step types",
),
StepCatalogEntry(
url=self.COMMUNITY_CATALOG_URL,
name="community",
priority=2,
install_allowed=False,
description="Community-contributed step types (discovery only)",
),
]
# -- Caching ----------------------------------------------------------
def _get_cache_paths(self, url: str) -> tuple[Path, Path]:
"""Get cache file paths for a URL (hash-based)."""
url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
cache_file = self.cache_dir / f"step-catalog-{url_hash}.json"
meta_file = self.cache_dir / f"step-catalog-{url_hash}-meta.json"
return cache_file, meta_file
def _is_url_cache_valid(self, url: str) -> bool:
"""Check if cached data for a URL is still fresh."""
_, meta_file = self._get_cache_paths(url)
if not meta_file.exists():
return False
try:
with open(meta_file, encoding="utf-8") as f:
meta = json.load(f)
fetched_at = float(meta.get("fetched_at", 0))
return (time.time() - fetched_at) < self.CACHE_DURATION
except (json.JSONDecodeError, OSError, TypeError, ValueError):
return False
def _fetch_single_catalog(
self, entry: StepCatalogEntry, force_refresh: bool = False
) -> dict[str, Any]:
"""Fetch a single catalog, using cache when possible."""
cache_safe = self._is_cache_path_safe()
cache_file, meta_file = self._get_cache_paths(entry.url)
if cache_safe and not force_refresh and self._is_url_cache_valid(entry.url):
try:
with open(cache_file, encoding="utf-8") as f:
cached = json.load(f)
if isinstance(cached, dict):
return cached
except (json.JSONDecodeError, OSError):
# Ignore invalid/unreadable cache and fall back to fetching from source.
pass
from urllib.parse import urlparse
from specify_cli.authentication.http import open_url as _open_url
def _validate_url(url: str) -> None:
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (
parsed.scheme == "http" and is_localhost
):
raise StepCatalogError(
f"Refusing to fetch catalog from non-HTTPS URL: {url}"
)
if not parsed.hostname:
raise StepCatalogError(
f"Refusing to fetch catalog from URL with no hostname: {url}"
)
_validate_url(entry.url)
try:
with _open_url(entry.url, timeout=30) as resp:
_validate_url(resp.geturl())
data = json.loads(resp.read().decode("utf-8"))
except Exception as exc:
if cache_safe and cache_file.exists():
try:
with open(cache_file, encoding="utf-8") as f:
cached = json.load(f)
if isinstance(cached, dict):
return cached
except (json.JSONDecodeError, ValueError, OSError):
# Stale-cache read failed; let the original fetch error propagate.
pass
raise StepCatalogError(
f"Failed to fetch catalog from {entry.url}: {exc}"
) from exc
if not isinstance(data, dict):
raise StepCatalogError(
f"Catalog from {entry.url} is not a valid JSON object."
)
if cache_safe:
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
with open(meta_file, "w", encoding="utf-8") as f:
json.dump({"url": entry.url, "fetched_at": time.time()}, f)
except OSError:
pass # Proceed without caching if disk write fails
return data
def _get_merged_steps(
self, force_refresh: bool = False
) -> dict[str, dict[str, Any]]:
"""Merge steps from all active catalogs (lower priority number wins)."""
catalogs = self.get_active_catalogs()
merged: dict[str, dict[str, Any]] = {}
fetch_errors = 0
for entry in reversed(catalogs):
try:
data = self._fetch_single_catalog(entry, force_refresh)
except StepCatalogError:
fetch_errors += 1
continue
steps = data.get("steps", {})
if isinstance(steps, dict):
for step_id, step_data in steps.items():
if not isinstance(step_data, dict):
continue
step_data["_catalog_name"] = entry.name
step_data["_install_allowed"] = entry.install_allowed
merged[step_id] = step_data
elif isinstance(steps, list):
for step_data in steps:
if not isinstance(step_data, dict):
continue
raw_step_id = step_data.get("id")
if raw_step_id is None:
continue
step_id = str(raw_step_id).strip()
if step_id:
step_data["id"] = step_id
step_data["_catalog_name"] = entry.name
step_data["_install_allowed"] = entry.install_allowed
merged[step_id] = step_data
if fetch_errors == len(catalogs) and catalogs:
raise StepCatalogError("All configured step catalogs failed to fetch.")
return merged
# -- Public API -------------------------------------------------------
def search(
self,
query: str | None = None,
) -> list[dict[str, Any]]:
"""Search step types across all configured catalogs."""
merged = self._get_merged_steps()
results: list[dict[str, Any]] = []
for step_id, step_data in merged.items():
step_data.setdefault("id", step_id)
if query:
q = query.lower()
searchable = " ".join(
[
str(step_data.get("name") or ""),
str(step_data.get("description") or ""),
str(step_data.get("id") or ""),
]
).lower()
if q not in searchable:
continue
results.append(step_data)
return results
def get_step_info(self, step_id: str) -> dict[str, Any] | None:
"""Get details for a specific step from the catalog."""
merged = self._get_merged_steps()
step = merged.get(step_id)
if step:
step.setdefault("id", step_id)
return step
def get_catalog_configs(self) -> list[dict[str, Any]]:
"""Return current catalog configuration as a list of dicts."""
entries = self.get_active_catalogs()
return [
{
"name": e.name,
"url": e.url,
"priority": e.priority,
"install_allowed": e.install_allowed,
"description": e.description,
}
for e in entries
]
def add_catalog(self, url: str, name: str | None = None) -> None:
"""Add a catalog source to the project-level config."""
self._validate_catalog_url(url)
config_path = self.project_root / ".specify" / "step-catalogs.yml"
data: dict[str, Any] = {"catalogs": []}
if config_path.exists():
try:
raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeDecodeError) as exc:
raise StepValidationError(
f"Catalog config file is unreadable or malformed: {exc}"
) from exc
if not isinstance(raw, dict):
raise StepValidationError(
"Catalog config file is corrupted (expected a mapping)."
)
data = raw
catalogs = data.get("catalogs", [])
if not isinstance(catalogs, list):
raise StepValidationError(
"Catalog config 'catalogs' must be a list."
)
for cat in catalogs:
if isinstance(cat, dict) and cat.get("url") == url:
raise StepValidationError(
f"Catalog URL already configured: {url}"
)
# Coerce existing priorities to int with a safe fallback so a user-edited
# step-catalogs.yml with a non-integer priority (e.g. "1") doesn't blow up.
def _coerce_priority(value: Any) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0
max_priority = max(
(
_coerce_priority(cat.get("priority", 0))
for cat in catalogs
if isinstance(cat, dict)
),
default=0,
)
catalogs.append(
{
"name": name or f"catalog-{len(catalogs) + 1}",
"url": url,
"priority": max_priority + 1,
"install_allowed": True,
"description": "",
}
)
data["catalogs"] = catalogs
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(
data, f, default_flow_style=False, sort_keys=False, allow_unicode=True
)
except OSError as exc:
raise StepValidationError(
f"Failed to write catalog config {config_path}: {exc}"
) from exc
def remove_catalog(self, index: int) -> str:
"""Remove a catalog source by index (0-based). Returns the removed name."""
config_path = self.project_root / ".specify" / "step-catalogs.yml"
if not config_path.exists():
raise StepValidationError("No step catalog config file found.")
try:
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeDecodeError) as exc:
raise StepValidationError(
f"Catalog config file is unreadable or malformed: {exc}"
) from exc
if not isinstance(data, dict):
raise StepValidationError(
"Catalog config file is corrupted (expected a mapping)."
)
catalogs = data.get("catalogs", [])
if not isinstance(catalogs, list):
raise StepValidationError(
"Catalog config 'catalogs' must be a list."
)
if index < 0 or index >= len(catalogs):
raise StepValidationError(
f"Catalog index {index} out of range (0-{len(catalogs) - 1})."
)
removed = catalogs.pop(index)
data["catalogs"] = catalogs
try:
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(
data, f, default_flow_style=False, sort_keys=False, allow_unicode=True
)
except OSError as exc:
raise StepValidationError(
f"Failed to write catalog config {config_path}: {exc}"
) from exc
if isinstance(removed, dict):
return removed.get("name", f"catalog-{index + 1}")
return f"catalog-{index + 1}"