Files
github-spec-kit/src/specify_cli/workflows/catalog.py
Copilot a00e679918 Add workflow engine with catalog system (#2158)
* Initial plan

* Add workflow engine with step registry, expression engine, catalog system, and CLI commands

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/72a7bb5d-071f-4d67-a507-7e1abae2384d

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Add comprehensive tests for workflow engine (94 tests)

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/72a7bb5d-071f-4d67-a507-7e1abae2384d

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Address review feedback: do-while condition preservation and URL scheme validation

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/72a7bb5d-071f-4d67-a507-7e1abae2384d

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* Address review feedback, add CLI dispatch, interactive gates, and docs

Review comments (7/7):
- Add explanatory comment to empty except block
- Implement workflow catalog download with cleanup on failure
- Add input type coercion for number/boolean/enum
- Fix example workflow to remove non-existent output references
- Fix while_loop and if_then condition defaults (string 'false' → bool False)
- Fix resume step index tracking with step_offset parameter

CLI dispatch:
- Add build_exec_args() and dispatch_command() to IntegrationBase
- Override for Claude (skills: /speckit-specify), Gemini (-m flag),
  Codex (codex exec), Copilot (--agent speckit.specify)
- CommandStep invokes installed commands by name via integration CLI
- Add PromptStep for arbitrary inline prompts (10th step type)
- Stream CLI output live to terminal (no silent blocking)
- Remove timeout when streaming (user can Ctrl+C)
- Ctrl+C saves state as PAUSED for clean resume

Interactive gates:
- Gate steps prompt [1] approve [2] reject in TTY
- Fall back to PAUSED in non-interactive environments
- Resume re-executes the gate for interactive prompting

Documentation:
- workflows/README.md — user guide
- workflows/ARCHITECTURE.md — internals with Mermaid diagrams
- workflows/PUBLISHING.md — catalog submission guide

Tests: 94 → 122 workflow tests, 1362 total (all passing)

* Fix ruff lint errors: unused imports, f-string placeholders, undefined name

* Address second review: registry-backed validation, shell failures, loop/fan-out execution, URL validation

- VALID_STEP_TYPES now queries STEP_REGISTRY dynamically
- Shell step returns FAILED on non-zero exit code
- Persist workflow YAML in run directory for reliable resume
- Resume loads from run copy, falls back to installed workflow
- Engine iterates while/do-while loops up to max_iterations
- Engine expands fan-out per item with context.item
- HTTPS URL validation for catalog workflow installs (HTTP allowed for localhost)
- Fix catalog merge priority docstring (lower number wins)
- Fix dispatch_command docstring (no build_exec_args_for_command)
- Gate on_reject=retry pauses for re-prompt on resume
- Update docs to 10 step types, add prompt step to tables and README

* Potential fix for pull request finding 'Empty except'

Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>

* Address third review: fan-out IDs, catalog guards, shell coercion, docs

- Fan-out generates unique per-item step IDs and collects results
- Catalog merge skips non-dict workflow entries (malformed data guard)
- Shell step coerces run_cmd to str after expression evaluation
- urlopen timeout=30 for catalog workflow installs
- yaml.dump with sort_keys=False, allow_unicode=True for catalog configs
- Document streaming timeout as intentionally unbounded (user Ctrl+C)
- Document --allow-all-tools as required for non-interactive + future enhancement
- Update test docstring and PUBLISHING.md to 10 step types with prompt

* Validate final URL after redirects in catalog fetch

urlopen follows redirects, so validate the response URL against the
same HTTPS/localhost rules to prevent redirect-based downgrade attacks.

* Address fourth review: filter arg eval, tags normalization, install redirect check

- Filter arguments now evaluated via _evaluate_simple_expression() so
  default(42) returns int not string
- Tags normalized: non-list/non-string values handled gracefully
- Install URL redirect validation (same as catalog fetch)
- Remove unused 'skipped' variable in catalog config parsing
- Author 'github' → 'GitHub' in example workflow
- Document nested step resume limitation (re-runs parent step)

* Add explanatory comment to empty except ValueError block

* Address fifth review: expression parsing, fan-out output, URL install, gate options

- Move string literal parsing before operator detection in expressions
  so quoted strings with operators (e.g. 'a in b') are not mis-parsed
- Fan-out: remove max_concurrency from persisted output, fix docstring
  to reflect sequential execution
- workflow add: support URL sources with HTTPS/redirect validation,
  validate workflow ID is non-empty before writing files
- Deduplicate local install logic via _validate_and_install_local()
- Remove 'edit' gate option from speckit workflow (not implemented)

* Add comments to empty except ValueError blocks in URL install

* Address sixth review: operator precedence, fan_in cleanup, registry resilience, docs

- Fix or/and operator precedence (or parsed first = lower precedence)
- Restore context.fan_in after fan-in step completes
- Catch JSONDecodeError in registry load for corrupted files
- Replace print() with on_step_start callback (library-safe)
- Gate validation warns when on_reject set but no reject option
- Shell step: document shell=True security tradeoff
- README: sdd-pipeline → speckit, parallel → sequential for fan-out
- ARCHITECTURE.md: parallel → fan-out/fan-in in diagram

* Address seventh review: string literal before pipe, type annotations, validate on install

- Move string literal check above pipe filter parsing so 'a | b' works
- Fix type annotations: input_values list[str] | None, run_id str | None
- Run validate_workflow() before installing from local path/URL
- Remove duplicate string literal check from expression parser

* Address eighth review: fan-out namespaced IDs, early return, catalog validation

- Fan-out per-item step IDs use _fanout_{step_id}_{base}_{idx} namespace
  to avoid collisions with user-defined step IDs
- Early return after fan-out loop when state is paused/failed/aborted
- Catalog installs parse + validate downloaded YAML before registering,
  using definition metadata instead of catalog entry for registry

* Address ninth review: populate catalog, fix indentation, priority, README

- Add speckit workflow entry to catalog.json so it's discoverable
- Fix shell step output dict indentation
- Catalog add_catalog priority derived from max existing + 1
- README Quick Start clarified with install + local file examples

* Address tenth review: max_iterations validation, catalog config guard, version alignment

- Validate max_iterations is int >= 1 in while and do-while steps
- Guard add_catalog against corrupted config (non-dict/non-list)
- Align speckit_version requirement to >=0.6.1 (current package version)
- Fan-out template validation uses separate seen_ids set to avoid
  false duplication errors with user-defined step IDs

* Address eleventh review: command step fails without CLI, ID mismatch warning, state persistence

- Command step returns FAILED when CLI not installed (was silent COMPLETED)
- Catalog install warns on workflow ID vs catalog key mismatch
- Engine persists state.save() before returning on unknown step type
- Update tests to expect FAILED for command steps without CLI
- Integration tests use shell steps for CLI-independent execution

* Address twelfth review: type annotations, version examples, streaming docs, requires

- Fix workflow_search type annotations (str | None)
- PUBLISHING.md: speckit_version >=0.15.0 → >=0.6.1
- Document that exit_code is captured and referenceable by later steps
- Mark requires as declared-but-not-enforced (planned enhancement)
- Note full stdout/stderr capture as planned enhancement

* Enforce catalog key matches workflow ID (fail instead of warn)

* Bundle speckit workflow: auto-install during specify init

- Add workflows/speckit to pyproject.toml force-include for wheel builds
- Add _locate_bundled_workflow() helper (mirrors _locate_bundled_extension)
- Auto-install speckit workflow during specify init (after git extension)
- Update all integration file inventory tests to expect workflow files

* Address fourteenth review: prompt fails without CLI, resolved step data, fan-out normalization

- PromptStep returns FAILED when CLI not installed (was silent COMPLETED)
- Engine step_data prefers resolved values from step output
- Fan-out normalizes output.results=[] for empty item lists
- subprocess.run inherits stdout/stderr (no explicit sys.stdout)
- Registry tests use issubset for extensibility

* Address fifteenth review: fan_in docstring, gate defaults, validation guards, reserved prefix

- FanInStep docstring: aggregate-only, no blocking semantics
- FanInStep: guard output_config as dict, handle None
- Gate validate: use same default options as execute
- Validate inputs is dict and steps is list before iterating
- Reserve _fanout_ prefix in step ID validation
- PUBLISHING.md: remove unenforced checklist items, add _fanout_ note

* Address sixteenth review: docs regex, fan_in try/finally, hyphenated dot-path keys

- PUBLISHING.md: update ID regex docs to match implementation (single-char OK)
- FanInStep: wrap expression evaluation in try/finally for context.fan_in
- Expression dot-path: allow hyphens in keys before list index (e.g. run-tests[0])

* Make speckit workflow integration-agnostic, document Copilot CLI requirement

- Workflow integration selectable via input (default: claude)
- Each command step uses {{ inputs.integration }} instead of hardcoded copilot
- Copilot docstring documents CLI requirement for workflow dispatch
- Added install_url for Copilot CLI docs

* Address seventeenth review: project checks, catalog robustness

- Add .specify/ project check to workflow run/resume/status/search/info
- remove_catalog validates config shape (dict + list) before indexing
- _fetch_single_catalog validates response is a dict
- _get_merged_workflows raises when all catalogs fail to fetch
- add_catalog guards against non-dict catalog entries in config

* Address eighteenth review: condition coercion, gate abort result, while default, cache guard, resume log

- evaluate_condition treats plain 'false'/'true' strings as booleans
- Gate abort returns StepResult(FAILED) instead of raising exception
  so step output is persisted in state for inspection
- while_loop max_iterations optional (default 10), validation aligned
- Catalog cache fallback catches invalid JSON gracefully
- resume() appends workflow_finished log entry like execute()

* Address nineteenth review: allow-all-tools opt-in, empty catalogs, abort dead code, while docstring

- --allow-all-tools controlled by SPECKIT_ALLOW_ALL_TOOLS env var (default: 1)
  Set to 0 to disable automatic tool approval for Copilot CLI
- Empty catalogs list falls back to built-in defaults (not an error)
- Remove unreachable WorkflowAbortError catches from execute/resume
  (gate abort now returns StepResult(FAILED) instead of raising)
- while_loop docstring updated: max_iterations is optional (default 10)

* Address twentieth review: gate abort maps to ABORTED status, do-while max_iterations optional

- Engine detects output.aborted from gate step and sets RunStatus.ABORTED
  (was unreachable — gate abort returned FAILED but status was always FAILED)
- do-while max_iterations now optional (default 10), aligned with while_loop
- do-while docstring and validation updated accordingly

* Coerce default_options to dict, align bundled workflow ID regex with validator

* Gate validates string options, prompt uses resolved integration, loop normalizes max_iterations

* Use parentId:childId convention for nested step IDs

- Fan-out per-item IDs use parentId:templateId:index (e.g. parallel:impl:0)
- Reserve ':' in user step IDs (validation rejects)
- Replaces _fanout_ prefix with cleaner namespacing
- Expressions like {{ steps.parallel:impl:0.output.file }} work naturally

* Validate workflow version is semantic versioning (X.Y.Z)

* Schema version validation, strict semver, load_workflow docstring, preserve max_concurrency

- Validate schema_version is '1.0' (reject unknown future schemas)
- Strict semver regex: ^\d+\.\d+\.\d+$ (rejects 1.0.0beta etc.)
- load_workflow docstring: 'parsed' not 'validated'
- Keep max_concurrency in fan-out output (was dropped)
- do_while docstring: engine re-evaluates step_config condition
- ARCHITECTURE.md: document nested resume limitation

* Path traversal prevention, loop step ID namespacing

- RunState validates run_id is alphanumeric+hyphens (no path separators)
- workflow_add validates catalog source doesn't escape workflows_dir
- Loop iterations namespace nested step IDs as parentId:childId:iteration
  so multiple iterations don't overwrite each other in context/state

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-04-14 10:11:56 -05:00

541 lines
20 KiB
Python

"""Workflow catalog — discovery, install, and management of workflows.
Mirrors the existing extension/preset catalog pattern with:
- Multi-catalog stack (env var → project → user → built-in)
- SHA256-hashed per-URL caching with 1-hour TTL
- Workflow registry for installed workflow tracking
- Search across all configured catalog sources
"""
from __future__ import annotations
import hashlib
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
# ---------------------------------------------------------------------------
# Errors
# ---------------------------------------------------------------------------
class WorkflowCatalogError(Exception):
"""Base error for workflow catalog operations."""
class WorkflowValidationError(WorkflowCatalogError):
"""Validation error for catalog config or workflow data."""
# ---------------------------------------------------------------------------
# CatalogEntry
# ---------------------------------------------------------------------------
@dataclass
class WorkflowCatalogEntry:
"""Represents a single catalog source in the catalog stack."""
url: str
name: str
priority: int
install_allowed: bool
description: str = ""
# ---------------------------------------------------------------------------
# WorkflowRegistry
# ---------------------------------------------------------------------------
class WorkflowRegistry:
"""Manages the registry of installed workflows.
Tracks installed workflows and their metadata in
``.specify/workflows/workflow-registry.json``.
"""
REGISTRY_FILE = "workflow-registry.json"
SCHEMA_VERSION = "1.0"
def __init__(self, project_root: Path) -> None:
self.project_root = project_root
self.workflows_dir = project_root / ".specify" / "workflows"
self.registry_path = self.workflows_dir / self.REGISTRY_FILE
self.data = self._load()
def _load(self) -> dict[str, Any]:
"""Load registry from disk or create default."""
if self.registry_path.exists():
try:
with open(self.registry_path, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, ValueError):
# Corrupted registry file — reset to default
return {"schema_version": self.SCHEMA_VERSION, "workflows": {}}
return {"schema_version": self.SCHEMA_VERSION, "workflows": {}}
def save(self) -> None:
"""Persist registry to disk."""
self.workflows_dir.mkdir(parents=True, exist_ok=True)
with open(self.registry_path, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2)
def add(self, workflow_id: str, metadata: dict[str, Any]) -> None:
"""Add or update an installed workflow entry."""
from datetime import datetime, timezone
existing = self.data["workflows"].get(workflow_id, {})
metadata["installed_at"] = existing.get(
"installed_at", datetime.now(timezone.utc).isoformat()
)
metadata["updated_at"] = datetime.now(timezone.utc).isoformat()
self.data["workflows"][workflow_id] = metadata
self.save()
def remove(self, workflow_id: str) -> bool:
"""Remove an installed workflow entry. Returns True if found."""
if workflow_id in self.data["workflows"]:
del self.data["workflows"][workflow_id]
self.save()
return True
return False
def get(self, workflow_id: str) -> dict[str, Any] | None:
"""Get metadata for an installed workflow."""
return self.data["workflows"].get(workflow_id)
def list(self) -> dict[str, dict[str, Any]]:
"""Return all installed workflows."""
return dict(self.data["workflows"])
def is_installed(self, workflow_id: str) -> bool:
"""Check if a workflow is installed."""
return workflow_id in self.data["workflows"]
# ---------------------------------------------------------------------------
# WorkflowCatalog
# ---------------------------------------------------------------------------
class WorkflowCatalog:
"""Manages workflow catalog fetching, caching, and searching.
Resolution order for catalog sources:
1. ``SPECKIT_WORKFLOW_CATALOG_URL`` env var (overrides all)
2. Project-level ``.specify/workflow-catalogs.yml``
3. User-level ``~/.specify/workflow-catalogs.yml``
4. Built-in defaults (official + community)
"""
DEFAULT_CATALOG_URL = (
"https://raw.githubusercontent.com/github/spec-kit/main/"
"workflows/catalog.json"
)
COMMUNITY_CATALOG_URL = (
"https://raw.githubusercontent.com/github/spec-kit/main/"
"workflows/catalog.community.json"
)
CACHE_DURATION = 3600 # 1 hour
def __init__(self, project_root: Path) -> None:
self.project_root = project_root
self.workflows_dir = project_root / ".specify" / "workflows"
self.cache_dir = self.workflows_dir / ".cache"
# -- Catalog resolution -----------------------------------------------
def _validate_catalog_url(self, url: str) -> None:
"""Validate that a catalog URL uses HTTPS (localhost HTTP allowed)."""
from urllib.parse import urlparse
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (
parsed.scheme == "http" and is_localhost
):
raise WorkflowValidationError(
f"Catalog URL must use HTTPS (got {parsed.scheme}://). "
"HTTP is only allowed for localhost."
)
if not parsed.netloc:
raise WorkflowValidationError(
"Catalog URL must be a valid URL with a host."
)
def _load_catalog_config(
self, config_path: Path
) -> list[WorkflowCatalogEntry] | None:
"""Load catalog stack configuration from a YAML file."""
if not config_path.exists():
return None
try:
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeError) as exc:
raise WorkflowValidationError(
f"Failed to read catalog config {config_path}: {exc}"
)
catalogs_data = data.get("catalogs", [])
if not catalogs_data:
# Empty catalogs list (e.g. after removing last entry)
# is valid — fall back to built-in defaults.
return None
if not isinstance(catalogs_data, list):
raise WorkflowValidationError(
f"Invalid catalog config: 'catalogs' must be a list, "
f"got {type(catalogs_data).__name__}"
)
entries: list[WorkflowCatalogEntry] = []
for idx, item in enumerate(catalogs_data):
if not isinstance(item, dict):
raise WorkflowValidationError(
f"Invalid catalog entry at index {idx}: "
f"expected a mapping, got {type(item).__name__}"
)
url = str(item.get("url", "")).strip()
if not url:
continue
self._validate_catalog_url(url)
try:
priority = int(item.get("priority", idx + 1))
except (TypeError, ValueError):
raise WorkflowValidationError(
f"Invalid priority for catalog "
f"'{item.get('name', idx + 1)}': "
f"expected integer, got {item.get('priority')!r}"
)
raw_install = item.get("install_allowed", False)
if isinstance(raw_install, str):
install_allowed = raw_install.strip().lower() in (
"true",
"yes",
"1",
)
else:
install_allowed = bool(raw_install)
entries.append(
WorkflowCatalogEntry(
url=url,
name=str(item.get("name", f"catalog-{idx + 1}")),
priority=priority,
install_allowed=install_allowed,
description=str(item.get("description", "")),
)
)
entries.sort(key=lambda e: e.priority)
if not entries:
raise WorkflowValidationError(
f"Catalog config {config_path} contains {len(catalogs_data)} "
f"entries but none have valid URLs."
)
return entries
def get_active_catalogs(self) -> list[WorkflowCatalogEntry]:
"""Get the ordered list of active catalogs."""
# 1. Environment variable override
env_url = os.environ.get("SPECKIT_WORKFLOW_CATALOG_URL", "").strip()
if env_url:
self._validate_catalog_url(env_url)
return [
WorkflowCatalogEntry(
url=env_url,
name="env-override",
priority=1,
install_allowed=True,
description="From SPECKIT_WORKFLOW_CATALOG_URL",
)
]
# 2. Project-level config
project_config = self.project_root / ".specify" / "workflow-catalogs.yml"
project_entries = self._load_catalog_config(project_config)
if project_entries is not None:
return project_entries
# 3. User-level config
home = Path.home()
user_config = home / ".specify" / "workflow-catalogs.yml"
user_entries = self._load_catalog_config(user_config)
if user_entries is not None:
return user_entries
# 4. Built-in defaults
return [
WorkflowCatalogEntry(
url=self.DEFAULT_CATALOG_URL,
name="default",
priority=1,
install_allowed=True,
description="Official workflows",
),
WorkflowCatalogEntry(
url=self.COMMUNITY_CATALOG_URL,
name="community",
priority=2,
install_allowed=False,
description="Community-contributed workflows (discovery only)",
),
]
# -- Caching ----------------------------------------------------------
def _get_cache_paths(self, url: str) -> tuple[Path, Path]:
"""Get cache file paths for a URL (hash-based)."""
url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
cache_file = self.cache_dir / f"workflow-catalog-{url_hash}.json"
meta_file = self.cache_dir / f"workflow-catalog-{url_hash}-meta.json"
return cache_file, meta_file
def _is_url_cache_valid(self, url: str) -> bool:
"""Check if cached data for a URL is still fresh."""
_, meta_file = self._get_cache_paths(url)
if not meta_file.exists():
return False
try:
with open(meta_file, encoding="utf-8") as f:
meta = json.load(f)
fetched_at = meta.get("fetched_at", 0)
return (time.time() - fetched_at) < self.CACHE_DURATION
except (json.JSONDecodeError, OSError):
return False
def _fetch_single_catalog(
self, entry: WorkflowCatalogEntry, force_refresh: bool = False
) -> dict[str, Any]:
"""Fetch a single catalog, using cache when possible."""
cache_file, meta_file = self._get_cache_paths(entry.url)
if not force_refresh and self._is_url_cache_valid(entry.url):
try:
with open(cache_file, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
# Fetch from URL — validate scheme before opening and after redirects
from urllib.parse import urlparse
from urllib.request import urlopen
def _validate_catalog_url(url: str) -> None:
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (
parsed.scheme == "http" and is_localhost
):
raise WorkflowCatalogError(
f"Refusing to fetch catalog from non-HTTPS URL: {url}"
)
_validate_catalog_url(entry.url)
try:
with urlopen(entry.url, timeout=30) as resp: # noqa: S310
_validate_catalog_url(resp.geturl())
data = json.loads(resp.read().decode("utf-8"))
except Exception as exc:
# Fall back to cache if available
if cache_file.exists():
try:
with open(cache_file, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, ValueError, OSError):
pass
raise WorkflowCatalogError(
f"Failed to fetch catalog from {entry.url}: {exc}"
) from exc
if not isinstance(data, dict):
raise WorkflowCatalogError(
f"Catalog from {entry.url} is not a valid JSON object."
)
# Write cache
self.cache_dir.mkdir(parents=True, exist_ok=True)
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
with open(meta_file, "w", encoding="utf-8") as f:
json.dump({"url": entry.url, "fetched_at": time.time()}, f)
return data
def _get_merged_workflows(
self, force_refresh: bool = False
) -> dict[str, dict[str, Any]]:
"""Merge workflows from all active catalogs (lower priority number wins)."""
catalogs = self.get_active_catalogs()
merged: dict[str, dict[str, Any]] = {}
fetch_errors = 0
# Process later/higher-numbered entries first so earlier/lower-numbered
# entries overwrite them on workflow ID conflicts.
for entry in reversed(catalogs):
try:
data = self._fetch_single_catalog(entry, force_refresh)
except WorkflowCatalogError:
fetch_errors += 1
continue
workflows = data.get("workflows", {})
# Handle both dict and list formats
if isinstance(workflows, dict):
for wf_id, wf_data in workflows.items():
if not isinstance(wf_data, dict):
continue
wf_data["_catalog_name"] = entry.name
wf_data["_install_allowed"] = entry.install_allowed
merged[wf_id] = wf_data
elif isinstance(workflows, list):
for wf_data in workflows:
if not isinstance(wf_data, dict):
continue
wf_id = wf_data.get("id", "")
if wf_id:
wf_data["_catalog_name"] = entry.name
wf_data["_install_allowed"] = entry.install_allowed
merged[wf_id] = wf_data
if fetch_errors == len(catalogs) and catalogs:
raise WorkflowCatalogError(
"All configured catalogs failed to fetch."
)
return merged
# -- Public API -------------------------------------------------------
def search(
self,
query: str | None = None,
tag: str | None = None,
) -> list[dict[str, Any]]:
"""Search workflows across all configured catalogs."""
merged = self._get_merged_workflows()
results: list[dict[str, Any]] = []
for wf_id, wf_data in merged.items():
wf_data.setdefault("id", wf_id)
if query:
q = query.lower()
searchable = " ".join(
[
wf_data.get("name", ""),
wf_data.get("description", ""),
wf_data.get("id", ""),
]
).lower()
if q not in searchable:
continue
if tag:
raw_tags = wf_data.get("tags", [])
tags = raw_tags if isinstance(raw_tags, list) else []
normalized_tags = [t.lower() for t in tags if isinstance(t, str)]
if tag.lower() not in normalized_tags:
continue
results.append(wf_data)
return results
def get_workflow_info(self, workflow_id: str) -> dict[str, Any] | None:
"""Get details for a specific workflow from the catalog."""
merged = self._get_merged_workflows()
wf = merged.get(workflow_id)
if wf:
wf.setdefault("id", workflow_id)
return wf
def get_catalog_configs(self) -> list[dict[str, Any]]:
"""Return current catalog configuration as a list of dicts."""
entries = self.get_active_catalogs()
return [
{
"name": e.name,
"url": e.url,
"priority": e.priority,
"install_allowed": e.install_allowed,
"description": e.description,
}
for e in entries
]
def add_catalog(self, url: str, name: str | None = None) -> None:
"""Add a catalog source to the project-level config."""
self._validate_catalog_url(url)
config_path = self.project_root / ".specify" / "workflow-catalogs.yml"
data: dict[str, Any] = {"catalogs": []}
if config_path.exists():
raw = yaml.safe_load(config_path.read_text(encoding="utf-8"))
if not isinstance(raw, dict):
raise WorkflowValidationError(
"Catalog config file is corrupted (expected a mapping)."
)
data = raw
catalogs = data.get("catalogs", [])
if not isinstance(catalogs, list):
raise WorkflowValidationError(
"Catalog config 'catalogs' must be a list."
)
# Check for duplicate URL (guard against non-dict entries)
for cat in catalogs:
if isinstance(cat, dict) and cat.get("url") == url:
raise WorkflowValidationError(
f"Catalog URL already configured: {url}"
)
# Derive priority from the highest existing priority + 1
max_priority = max(
(cat.get("priority", 0) for cat in catalogs if isinstance(cat, dict)),
default=0,
)
catalogs.append(
{
"name": name or f"catalog-{len(catalogs) + 1}",
"url": url,
"priority": max_priority + 1,
"install_allowed": True,
"description": "",
}
)
data["catalogs"] = catalogs
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
def remove_catalog(self, index: int) -> str:
"""Remove a catalog source by index (0-based). Returns the removed name."""
config_path = self.project_root / ".specify" / "workflow-catalogs.yml"
if not config_path.exists():
raise WorkflowValidationError("No catalog config file found.")
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
if not isinstance(data, dict):
raise WorkflowValidationError(
"Catalog config file is corrupted (expected a mapping)."
)
catalogs = data.get("catalogs", [])
if not isinstance(catalogs, list):
raise WorkflowValidationError(
"Catalog config 'catalogs' must be a list."
)
if index < 0 or index >= len(catalogs):
raise WorkflowValidationError(
f"Catalog index {index} out of range (0-{len(catalogs) - 1})."
)
removed = catalogs.pop(index)
data["catalogs"] = catalogs
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
if isinstance(removed, dict):
return removed.get("name", f"catalog-{index + 1}")
return f"catalog-{index + 1}"