mirror of
https://github.com/github/spec-kit.git
synced 2026-07-06 05:53:12 +08:00
* Initial plan * Add workflow engine with step registry, expression engine, catalog system, and CLI commands Agent-Logs-Url: https://github.com/github/spec-kit/sessions/72a7bb5d-071f-4d67-a507-7e1abae2384d Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * Add comprehensive tests for workflow engine (94 tests) Agent-Logs-Url: https://github.com/github/spec-kit/sessions/72a7bb5d-071f-4d67-a507-7e1abae2384d Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * Address review feedback: do-while condition preservation and URL scheme validation Agent-Logs-Url: https://github.com/github/spec-kit/sessions/72a7bb5d-071f-4d67-a507-7e1abae2384d Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * Address review feedback, add CLI dispatch, interactive gates, and docs Review comments (7/7): - Add explanatory comment to empty except block - Implement workflow catalog download with cleanup on failure - Add input type coercion for number/boolean/enum - Fix example workflow to remove non-existent output references - Fix while_loop and if_then condition defaults (string 'false' → bool False) - Fix resume step index tracking with step_offset parameter CLI dispatch: - Add build_exec_args() and dispatch_command() to IntegrationBase - Override for Claude (skills: /speckit-specify), Gemini (-m flag), Codex (codex exec), Copilot (--agent speckit.specify) - CommandStep invokes installed commands by name via integration CLI - Add PromptStep for arbitrary inline prompts (10th step type) - Stream CLI output live to terminal (no silent blocking) - Remove timeout when streaming (user can Ctrl+C) - Ctrl+C saves state as PAUSED for clean resume Interactive gates: - Gate steps prompt [1] approve [2] reject in TTY - Fall back to PAUSED in non-interactive environments - Resume re-executes the gate for interactive prompting Documentation: - workflows/README.md — user guide - workflows/ARCHITECTURE.md — internals with Mermaid diagrams - workflows/PUBLISHING.md — catalog submission guide Tests: 94 → 122 workflow tests, 1362 total (all passing) * Fix ruff lint errors: unused imports, f-string placeholders, undefined name * Address second review: registry-backed validation, shell failures, loop/fan-out execution, URL validation - VALID_STEP_TYPES now queries STEP_REGISTRY dynamically - Shell step returns FAILED on non-zero exit code - Persist workflow YAML in run directory for reliable resume - Resume loads from run copy, falls back to installed workflow - Engine iterates while/do-while loops up to max_iterations - Engine expands fan-out per item with context.item - HTTPS URL validation for catalog workflow installs (HTTP allowed for localhost) - Fix catalog merge priority docstring (lower number wins) - Fix dispatch_command docstring (no build_exec_args_for_command) - Gate on_reject=retry pauses for re-prompt on resume - Update docs to 10 step types, add prompt step to tables and README * Potential fix for pull request finding 'Empty except' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> * Address third review: fan-out IDs, catalog guards, shell coercion, docs - Fan-out generates unique per-item step IDs and collects results - Catalog merge skips non-dict workflow entries (malformed data guard) - Shell step coerces run_cmd to str after expression evaluation - urlopen timeout=30 for catalog workflow installs - yaml.dump with sort_keys=False, allow_unicode=True for catalog configs - Document streaming timeout as intentionally unbounded (user Ctrl+C) - Document --allow-all-tools as required for non-interactive + future enhancement - Update test docstring and PUBLISHING.md to 10 step types with prompt * Validate final URL after redirects in catalog fetch urlopen follows redirects, so validate the response URL against the same HTTPS/localhost rules to prevent redirect-based downgrade attacks. * Address fourth review: filter arg eval, tags normalization, install redirect check - Filter arguments now evaluated via _evaluate_simple_expression() so default(42) returns int not string - Tags normalized: non-list/non-string values handled gracefully - Install URL redirect validation (same as catalog fetch) - Remove unused 'skipped' variable in catalog config parsing - Author 'github' → 'GitHub' in example workflow - Document nested step resume limitation (re-runs parent step) * Add explanatory comment to empty except ValueError block * Address fifth review: expression parsing, fan-out output, URL install, gate options - Move string literal parsing before operator detection in expressions so quoted strings with operators (e.g. 'a in b') are not mis-parsed - Fan-out: remove max_concurrency from persisted output, fix docstring to reflect sequential execution - workflow add: support URL sources with HTTPS/redirect validation, validate workflow ID is non-empty before writing files - Deduplicate local install logic via _validate_and_install_local() - Remove 'edit' gate option from speckit workflow (not implemented) * Add comments to empty except ValueError blocks in URL install * Address sixth review: operator precedence, fan_in cleanup, registry resilience, docs - Fix or/and operator precedence (or parsed first = lower precedence) - Restore context.fan_in after fan-in step completes - Catch JSONDecodeError in registry load for corrupted files - Replace print() with on_step_start callback (library-safe) - Gate validation warns when on_reject set but no reject option - Shell step: document shell=True security tradeoff - README: sdd-pipeline → speckit, parallel → sequential for fan-out - ARCHITECTURE.md: parallel → fan-out/fan-in in diagram * Address seventh review: string literal before pipe, type annotations, validate on install - Move string literal check above pipe filter parsing so 'a | b' works - Fix type annotations: input_values list[str] | None, run_id str | None - Run validate_workflow() before installing from local path/URL - Remove duplicate string literal check from expression parser * Address eighth review: fan-out namespaced IDs, early return, catalog validation - Fan-out per-item step IDs use _fanout_{step_id}_{base}_{idx} namespace to avoid collisions with user-defined step IDs - Early return after fan-out loop when state is paused/failed/aborted - Catalog installs parse + validate downloaded YAML before registering, using definition metadata instead of catalog entry for registry * Address ninth review: populate catalog, fix indentation, priority, README - Add speckit workflow entry to catalog.json so it's discoverable - Fix shell step output dict indentation - Catalog add_catalog priority derived from max existing + 1 - README Quick Start clarified with install + local file examples * Address tenth review: max_iterations validation, catalog config guard, version alignment - Validate max_iterations is int >= 1 in while and do-while steps - Guard add_catalog against corrupted config (non-dict/non-list) - Align speckit_version requirement to >=0.6.1 (current package version) - Fan-out template validation uses separate seen_ids set to avoid false duplication errors with user-defined step IDs * Address eleventh review: command step fails without CLI, ID mismatch warning, state persistence - Command step returns FAILED when CLI not installed (was silent COMPLETED) - Catalog install warns on workflow ID vs catalog key mismatch - Engine persists state.save() before returning on unknown step type - Update tests to expect FAILED for command steps without CLI - Integration tests use shell steps for CLI-independent execution * Address twelfth review: type annotations, version examples, streaming docs, requires - Fix workflow_search type annotations (str | None) - PUBLISHING.md: speckit_version >=0.15.0 → >=0.6.1 - Document that exit_code is captured and referenceable by later steps - Mark requires as declared-but-not-enforced (planned enhancement) - Note full stdout/stderr capture as planned enhancement * Enforce catalog key matches workflow ID (fail instead of warn) * Bundle speckit workflow: auto-install during specify init - Add workflows/speckit to pyproject.toml force-include for wheel builds - Add _locate_bundled_workflow() helper (mirrors _locate_bundled_extension) - Auto-install speckit workflow during specify init (after git extension) - Update all integration file inventory tests to expect workflow files * Address fourteenth review: prompt fails without CLI, resolved step data, fan-out normalization - PromptStep returns FAILED when CLI not installed (was silent COMPLETED) - Engine step_data prefers resolved values from step output - Fan-out normalizes output.results=[] for empty item lists - subprocess.run inherits stdout/stderr (no explicit sys.stdout) - Registry tests use issubset for extensibility * Address fifteenth review: fan_in docstring, gate defaults, validation guards, reserved prefix - FanInStep docstring: aggregate-only, no blocking semantics - FanInStep: guard output_config as dict, handle None - Gate validate: use same default options as execute - Validate inputs is dict and steps is list before iterating - Reserve _fanout_ prefix in step ID validation - PUBLISHING.md: remove unenforced checklist items, add _fanout_ note * Address sixteenth review: docs regex, fan_in try/finally, hyphenated dot-path keys - PUBLISHING.md: update ID regex docs to match implementation (single-char OK) - FanInStep: wrap expression evaluation in try/finally for context.fan_in - Expression dot-path: allow hyphens in keys before list index (e.g. run-tests[0]) * Make speckit workflow integration-agnostic, document Copilot CLI requirement - Workflow integration selectable via input (default: claude) - Each command step uses {{ inputs.integration }} instead of hardcoded copilot - Copilot docstring documents CLI requirement for workflow dispatch - Added install_url for Copilot CLI docs * Address seventeenth review: project checks, catalog robustness - Add .specify/ project check to workflow run/resume/status/search/info - remove_catalog validates config shape (dict + list) before indexing - _fetch_single_catalog validates response is a dict - _get_merged_workflows raises when all catalogs fail to fetch - add_catalog guards against non-dict catalog entries in config * Address eighteenth review: condition coercion, gate abort result, while default, cache guard, resume log - evaluate_condition treats plain 'false'/'true' strings as booleans - Gate abort returns StepResult(FAILED) instead of raising exception so step output is persisted in state for inspection - while_loop max_iterations optional (default 10), validation aligned - Catalog cache fallback catches invalid JSON gracefully - resume() appends workflow_finished log entry like execute() * Address nineteenth review: allow-all-tools opt-in, empty catalogs, abort dead code, while docstring - --allow-all-tools controlled by SPECKIT_ALLOW_ALL_TOOLS env var (default: 1) Set to 0 to disable automatic tool approval for Copilot CLI - Empty catalogs list falls back to built-in defaults (not an error) - Remove unreachable WorkflowAbortError catches from execute/resume (gate abort now returns StepResult(FAILED) instead of raising) - while_loop docstring updated: max_iterations is optional (default 10) * Address twentieth review: gate abort maps to ABORTED status, do-while max_iterations optional - Engine detects output.aborted from gate step and sets RunStatus.ABORTED (was unreachable — gate abort returned FAILED but status was always FAILED) - do-while max_iterations now optional (default 10), aligned with while_loop - do-while docstring and validation updated accordingly * Coerce default_options to dict, align bundled workflow ID regex with validator * Gate validates string options, prompt uses resolved integration, loop normalizes max_iterations * Use parentId:childId convention for nested step IDs - Fan-out per-item IDs use parentId:templateId:index (e.g. parallel:impl:0) - Reserve ':' in user step IDs (validation rejects) - Replaces _fanout_ prefix with cleaner namespacing - Expressions like {{ steps.parallel:impl:0.output.file }} work naturally * Validate workflow version is semantic versioning (X.Y.Z) * Schema version validation, strict semver, load_workflow docstring, preserve max_concurrency - Validate schema_version is '1.0' (reject unknown future schemas) - Strict semver regex: ^\d+\.\d+\.\d+$ (rejects 1.0.0beta etc.) - load_workflow docstring: 'parsed' not 'validated' - Keep max_concurrency in fan-out output (was dropped) - do_while docstring: engine re-evaluates step_config condition - ARCHITECTURE.md: document nested resume limitation * Path traversal prevention, loop step ID namespacing - RunState validates run_id is alphanumeric+hyphens (no path separators) - workflow_add validates catalog source doesn't escape workflows_dir - Loop iterations namespace nested step IDs as parentId:childId:iteration so multiple iterations don't overwrite each other in context/state --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
541 lines
20 KiB
Python
541 lines
20 KiB
Python
"""Workflow catalog — discovery, install, and management of workflows.
|
|
|
|
Mirrors the existing extension/preset catalog pattern with:
|
|
- Multi-catalog stack (env var → project → user → built-in)
|
|
- SHA256-hashed per-URL caching with 1-hour TTL
|
|
- Workflow registry for installed workflow tracking
|
|
- Search across all configured catalog sources
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Errors
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class WorkflowCatalogError(Exception):
|
|
"""Base error for workflow catalog operations."""
|
|
|
|
|
|
class WorkflowValidationError(WorkflowCatalogError):
|
|
"""Validation error for catalog config or workflow data."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CatalogEntry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class WorkflowCatalogEntry:
|
|
"""Represents a single catalog source in the catalog stack."""
|
|
|
|
url: str
|
|
name: str
|
|
priority: int
|
|
install_allowed: bool
|
|
description: str = ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WorkflowRegistry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class WorkflowRegistry:
|
|
"""Manages the registry of installed workflows.
|
|
|
|
Tracks installed workflows and their metadata in
|
|
``.specify/workflows/workflow-registry.json``.
|
|
"""
|
|
|
|
REGISTRY_FILE = "workflow-registry.json"
|
|
SCHEMA_VERSION = "1.0"
|
|
|
|
def __init__(self, project_root: Path) -> None:
|
|
self.project_root = project_root
|
|
self.workflows_dir = project_root / ".specify" / "workflows"
|
|
self.registry_path = self.workflows_dir / self.REGISTRY_FILE
|
|
self.data = self._load()
|
|
|
|
def _load(self) -> dict[str, Any]:
|
|
"""Load registry from disk or create default."""
|
|
if self.registry_path.exists():
|
|
try:
|
|
with open(self.registry_path, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, ValueError):
|
|
# Corrupted registry file — reset to default
|
|
return {"schema_version": self.SCHEMA_VERSION, "workflows": {}}
|
|
return {"schema_version": self.SCHEMA_VERSION, "workflows": {}}
|
|
|
|
def save(self) -> None:
|
|
"""Persist registry to disk."""
|
|
self.workflows_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(self.registry_path, "w", encoding="utf-8") as f:
|
|
json.dump(self.data, f, indent=2)
|
|
|
|
def add(self, workflow_id: str, metadata: dict[str, Any]) -> None:
|
|
"""Add or update an installed workflow entry."""
|
|
from datetime import datetime, timezone
|
|
|
|
existing = self.data["workflows"].get(workflow_id, {})
|
|
metadata["installed_at"] = existing.get(
|
|
"installed_at", datetime.now(timezone.utc).isoformat()
|
|
)
|
|
metadata["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
self.data["workflows"][workflow_id] = metadata
|
|
self.save()
|
|
|
|
def remove(self, workflow_id: str) -> bool:
|
|
"""Remove an installed workflow entry. Returns True if found."""
|
|
if workflow_id in self.data["workflows"]:
|
|
del self.data["workflows"][workflow_id]
|
|
self.save()
|
|
return True
|
|
return False
|
|
|
|
def get(self, workflow_id: str) -> dict[str, Any] | None:
|
|
"""Get metadata for an installed workflow."""
|
|
return self.data["workflows"].get(workflow_id)
|
|
|
|
def list(self) -> dict[str, dict[str, Any]]:
|
|
"""Return all installed workflows."""
|
|
return dict(self.data["workflows"])
|
|
|
|
def is_installed(self, workflow_id: str) -> bool:
|
|
"""Check if a workflow is installed."""
|
|
return workflow_id in self.data["workflows"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WorkflowCatalog
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class WorkflowCatalog:
|
|
"""Manages workflow catalog fetching, caching, and searching.
|
|
|
|
Resolution order for catalog sources:
|
|
1. ``SPECKIT_WORKFLOW_CATALOG_URL`` env var (overrides all)
|
|
2. Project-level ``.specify/workflow-catalogs.yml``
|
|
3. User-level ``~/.specify/workflow-catalogs.yml``
|
|
4. Built-in defaults (official + community)
|
|
"""
|
|
|
|
DEFAULT_CATALOG_URL = (
|
|
"https://raw.githubusercontent.com/github/spec-kit/main/"
|
|
"workflows/catalog.json"
|
|
)
|
|
COMMUNITY_CATALOG_URL = (
|
|
"https://raw.githubusercontent.com/github/spec-kit/main/"
|
|
"workflows/catalog.community.json"
|
|
)
|
|
CACHE_DURATION = 3600 # 1 hour
|
|
|
|
def __init__(self, project_root: Path) -> None:
|
|
self.project_root = project_root
|
|
self.workflows_dir = project_root / ".specify" / "workflows"
|
|
self.cache_dir = self.workflows_dir / ".cache"
|
|
|
|
# -- Catalog resolution -----------------------------------------------
|
|
|
|
def _validate_catalog_url(self, url: str) -> None:
|
|
"""Validate that a catalog URL uses HTTPS (localhost HTTP allowed)."""
|
|
from urllib.parse import urlparse
|
|
|
|
parsed = urlparse(url)
|
|
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
|
|
if parsed.scheme != "https" and not (
|
|
parsed.scheme == "http" and is_localhost
|
|
):
|
|
raise WorkflowValidationError(
|
|
f"Catalog URL must use HTTPS (got {parsed.scheme}://). "
|
|
"HTTP is only allowed for localhost."
|
|
)
|
|
if not parsed.netloc:
|
|
raise WorkflowValidationError(
|
|
"Catalog URL must be a valid URL with a host."
|
|
)
|
|
|
|
def _load_catalog_config(
|
|
self, config_path: Path
|
|
) -> list[WorkflowCatalogEntry] | None:
|
|
"""Load catalog stack configuration from a YAML file."""
|
|
if not config_path.exists():
|
|
return None
|
|
try:
|
|
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
|
|
except (yaml.YAMLError, OSError, UnicodeError) as exc:
|
|
raise WorkflowValidationError(
|
|
f"Failed to read catalog config {config_path}: {exc}"
|
|
)
|
|
catalogs_data = data.get("catalogs", [])
|
|
if not catalogs_data:
|
|
# Empty catalogs list (e.g. after removing last entry)
|
|
# is valid — fall back to built-in defaults.
|
|
return None
|
|
if not isinstance(catalogs_data, list):
|
|
raise WorkflowValidationError(
|
|
f"Invalid catalog config: 'catalogs' must be a list, "
|
|
f"got {type(catalogs_data).__name__}"
|
|
)
|
|
|
|
entries: list[WorkflowCatalogEntry] = []
|
|
for idx, item in enumerate(catalogs_data):
|
|
if not isinstance(item, dict):
|
|
raise WorkflowValidationError(
|
|
f"Invalid catalog entry at index {idx}: "
|
|
f"expected a mapping, got {type(item).__name__}"
|
|
)
|
|
url = str(item.get("url", "")).strip()
|
|
if not url:
|
|
continue
|
|
self._validate_catalog_url(url)
|
|
try:
|
|
priority = int(item.get("priority", idx + 1))
|
|
except (TypeError, ValueError):
|
|
raise WorkflowValidationError(
|
|
f"Invalid priority for catalog "
|
|
f"'{item.get('name', idx + 1)}': "
|
|
f"expected integer, got {item.get('priority')!r}"
|
|
)
|
|
raw_install = item.get("install_allowed", False)
|
|
if isinstance(raw_install, str):
|
|
install_allowed = raw_install.strip().lower() in (
|
|
"true",
|
|
"yes",
|
|
"1",
|
|
)
|
|
else:
|
|
install_allowed = bool(raw_install)
|
|
entries.append(
|
|
WorkflowCatalogEntry(
|
|
url=url,
|
|
name=str(item.get("name", f"catalog-{idx + 1}")),
|
|
priority=priority,
|
|
install_allowed=install_allowed,
|
|
description=str(item.get("description", "")),
|
|
)
|
|
)
|
|
entries.sort(key=lambda e: e.priority)
|
|
if not entries:
|
|
raise WorkflowValidationError(
|
|
f"Catalog config {config_path} contains {len(catalogs_data)} "
|
|
f"entries but none have valid URLs."
|
|
)
|
|
return entries
|
|
|
|
def get_active_catalogs(self) -> list[WorkflowCatalogEntry]:
|
|
"""Get the ordered list of active catalogs."""
|
|
# 1. Environment variable override
|
|
env_url = os.environ.get("SPECKIT_WORKFLOW_CATALOG_URL", "").strip()
|
|
if env_url:
|
|
self._validate_catalog_url(env_url)
|
|
return [
|
|
WorkflowCatalogEntry(
|
|
url=env_url,
|
|
name="env-override",
|
|
priority=1,
|
|
install_allowed=True,
|
|
description="From SPECKIT_WORKFLOW_CATALOG_URL",
|
|
)
|
|
]
|
|
|
|
# 2. Project-level config
|
|
project_config = self.project_root / ".specify" / "workflow-catalogs.yml"
|
|
project_entries = self._load_catalog_config(project_config)
|
|
if project_entries is not None:
|
|
return project_entries
|
|
|
|
# 3. User-level config
|
|
home = Path.home()
|
|
user_config = home / ".specify" / "workflow-catalogs.yml"
|
|
user_entries = self._load_catalog_config(user_config)
|
|
if user_entries is not None:
|
|
return user_entries
|
|
|
|
# 4. Built-in defaults
|
|
return [
|
|
WorkflowCatalogEntry(
|
|
url=self.DEFAULT_CATALOG_URL,
|
|
name="default",
|
|
priority=1,
|
|
install_allowed=True,
|
|
description="Official workflows",
|
|
),
|
|
WorkflowCatalogEntry(
|
|
url=self.COMMUNITY_CATALOG_URL,
|
|
name="community",
|
|
priority=2,
|
|
install_allowed=False,
|
|
description="Community-contributed workflows (discovery only)",
|
|
),
|
|
]
|
|
|
|
# -- Caching ----------------------------------------------------------
|
|
|
|
def _get_cache_paths(self, url: str) -> tuple[Path, Path]:
|
|
"""Get cache file paths for a URL (hash-based)."""
|
|
url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
|
|
cache_file = self.cache_dir / f"workflow-catalog-{url_hash}.json"
|
|
meta_file = self.cache_dir / f"workflow-catalog-{url_hash}-meta.json"
|
|
return cache_file, meta_file
|
|
|
|
def _is_url_cache_valid(self, url: str) -> bool:
|
|
"""Check if cached data for a URL is still fresh."""
|
|
_, meta_file = self._get_cache_paths(url)
|
|
if not meta_file.exists():
|
|
return False
|
|
try:
|
|
with open(meta_file, encoding="utf-8") as f:
|
|
meta = json.load(f)
|
|
fetched_at = meta.get("fetched_at", 0)
|
|
return (time.time() - fetched_at) < self.CACHE_DURATION
|
|
except (json.JSONDecodeError, OSError):
|
|
return False
|
|
|
|
def _fetch_single_catalog(
|
|
self, entry: WorkflowCatalogEntry, force_refresh: bool = False
|
|
) -> dict[str, Any]:
|
|
"""Fetch a single catalog, using cache when possible."""
|
|
cache_file, meta_file = self._get_cache_paths(entry.url)
|
|
|
|
if not force_refresh and self._is_url_cache_valid(entry.url):
|
|
try:
|
|
with open(cache_file, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
# Fetch from URL — validate scheme before opening and after redirects
|
|
from urllib.parse import urlparse
|
|
from urllib.request import urlopen
|
|
|
|
def _validate_catalog_url(url: str) -> None:
|
|
parsed = urlparse(url)
|
|
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
|
|
if parsed.scheme != "https" and not (
|
|
parsed.scheme == "http" and is_localhost
|
|
):
|
|
raise WorkflowCatalogError(
|
|
f"Refusing to fetch catalog from non-HTTPS URL: {url}"
|
|
)
|
|
|
|
_validate_catalog_url(entry.url)
|
|
|
|
try:
|
|
with urlopen(entry.url, timeout=30) as resp: # noqa: S310
|
|
_validate_catalog_url(resp.geturl())
|
|
data = json.loads(resp.read().decode("utf-8"))
|
|
except Exception as exc:
|
|
# Fall back to cache if available
|
|
if cache_file.exists():
|
|
try:
|
|
with open(cache_file, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, ValueError, OSError):
|
|
pass
|
|
raise WorkflowCatalogError(
|
|
f"Failed to fetch catalog from {entry.url}: {exc}"
|
|
) from exc
|
|
|
|
if not isinstance(data, dict):
|
|
raise WorkflowCatalogError(
|
|
f"Catalog from {entry.url} is not a valid JSON object."
|
|
)
|
|
|
|
# Write cache
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(cache_file, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2)
|
|
with open(meta_file, "w", encoding="utf-8") as f:
|
|
json.dump({"url": entry.url, "fetched_at": time.time()}, f)
|
|
|
|
return data
|
|
|
|
def _get_merged_workflows(
|
|
self, force_refresh: bool = False
|
|
) -> dict[str, dict[str, Any]]:
|
|
"""Merge workflows from all active catalogs (lower priority number wins)."""
|
|
catalogs = self.get_active_catalogs()
|
|
merged: dict[str, dict[str, Any]] = {}
|
|
fetch_errors = 0
|
|
|
|
# Process later/higher-numbered entries first so earlier/lower-numbered
|
|
# entries overwrite them on workflow ID conflicts.
|
|
for entry in reversed(catalogs):
|
|
try:
|
|
data = self._fetch_single_catalog(entry, force_refresh)
|
|
except WorkflowCatalogError:
|
|
fetch_errors += 1
|
|
continue
|
|
workflows = data.get("workflows", {})
|
|
# Handle both dict and list formats
|
|
if isinstance(workflows, dict):
|
|
for wf_id, wf_data in workflows.items():
|
|
if not isinstance(wf_data, dict):
|
|
continue
|
|
wf_data["_catalog_name"] = entry.name
|
|
wf_data["_install_allowed"] = entry.install_allowed
|
|
merged[wf_id] = wf_data
|
|
elif isinstance(workflows, list):
|
|
for wf_data in workflows:
|
|
if not isinstance(wf_data, dict):
|
|
continue
|
|
wf_id = wf_data.get("id", "")
|
|
if wf_id:
|
|
wf_data["_catalog_name"] = entry.name
|
|
wf_data["_install_allowed"] = entry.install_allowed
|
|
merged[wf_id] = wf_data
|
|
if fetch_errors == len(catalogs) and catalogs:
|
|
raise WorkflowCatalogError(
|
|
"All configured catalogs failed to fetch."
|
|
)
|
|
return merged
|
|
|
|
# -- Public API -------------------------------------------------------
|
|
|
|
def search(
|
|
self,
|
|
query: str | None = None,
|
|
tag: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Search workflows across all configured catalogs."""
|
|
merged = self._get_merged_workflows()
|
|
results: list[dict[str, Any]] = []
|
|
|
|
for wf_id, wf_data in merged.items():
|
|
wf_data.setdefault("id", wf_id)
|
|
if query:
|
|
q = query.lower()
|
|
searchable = " ".join(
|
|
[
|
|
wf_data.get("name", ""),
|
|
wf_data.get("description", ""),
|
|
wf_data.get("id", ""),
|
|
]
|
|
).lower()
|
|
if q not in searchable:
|
|
continue
|
|
if tag:
|
|
raw_tags = wf_data.get("tags", [])
|
|
tags = raw_tags if isinstance(raw_tags, list) else []
|
|
normalized_tags = [t.lower() for t in tags if isinstance(t, str)]
|
|
if tag.lower() not in normalized_tags:
|
|
continue
|
|
results.append(wf_data)
|
|
return results
|
|
|
|
def get_workflow_info(self, workflow_id: str) -> dict[str, Any] | None:
|
|
"""Get details for a specific workflow from the catalog."""
|
|
merged = self._get_merged_workflows()
|
|
wf = merged.get(workflow_id)
|
|
if wf:
|
|
wf.setdefault("id", workflow_id)
|
|
return wf
|
|
|
|
def get_catalog_configs(self) -> list[dict[str, Any]]:
|
|
"""Return current catalog configuration as a list of dicts."""
|
|
entries = self.get_active_catalogs()
|
|
return [
|
|
{
|
|
"name": e.name,
|
|
"url": e.url,
|
|
"priority": e.priority,
|
|
"install_allowed": e.install_allowed,
|
|
"description": e.description,
|
|
}
|
|
for e in entries
|
|
]
|
|
|
|
def add_catalog(self, url: str, name: str | None = None) -> None:
|
|
"""Add a catalog source to the project-level config."""
|
|
self._validate_catalog_url(url)
|
|
config_path = self.project_root / ".specify" / "workflow-catalogs.yml"
|
|
|
|
data: dict[str, Any] = {"catalogs": []}
|
|
if config_path.exists():
|
|
raw = yaml.safe_load(config_path.read_text(encoding="utf-8"))
|
|
if not isinstance(raw, dict):
|
|
raise WorkflowValidationError(
|
|
"Catalog config file is corrupted (expected a mapping)."
|
|
)
|
|
data = raw
|
|
|
|
catalogs = data.get("catalogs", [])
|
|
if not isinstance(catalogs, list):
|
|
raise WorkflowValidationError(
|
|
"Catalog config 'catalogs' must be a list."
|
|
)
|
|
# Check for duplicate URL (guard against non-dict entries)
|
|
for cat in catalogs:
|
|
if isinstance(cat, dict) and cat.get("url") == url:
|
|
raise WorkflowValidationError(
|
|
f"Catalog URL already configured: {url}"
|
|
)
|
|
|
|
# Derive priority from the highest existing priority + 1
|
|
max_priority = max(
|
|
(cat.get("priority", 0) for cat in catalogs if isinstance(cat, dict)),
|
|
default=0,
|
|
)
|
|
catalogs.append(
|
|
{
|
|
"name": name or f"catalog-{len(catalogs) + 1}",
|
|
"url": url,
|
|
"priority": max_priority + 1,
|
|
"install_allowed": True,
|
|
"description": "",
|
|
}
|
|
)
|
|
data["catalogs"] = catalogs
|
|
|
|
config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(config_path, "w", encoding="utf-8") as f:
|
|
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
|
|
|
def remove_catalog(self, index: int) -> str:
|
|
"""Remove a catalog source by index (0-based). Returns the removed name."""
|
|
config_path = self.project_root / ".specify" / "workflow-catalogs.yml"
|
|
if not config_path.exists():
|
|
raise WorkflowValidationError("No catalog config file found.")
|
|
|
|
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
|
|
if not isinstance(data, dict):
|
|
raise WorkflowValidationError(
|
|
"Catalog config file is corrupted (expected a mapping)."
|
|
)
|
|
catalogs = data.get("catalogs", [])
|
|
if not isinstance(catalogs, list):
|
|
raise WorkflowValidationError(
|
|
"Catalog config 'catalogs' must be a list."
|
|
)
|
|
|
|
if index < 0 or index >= len(catalogs):
|
|
raise WorkflowValidationError(
|
|
f"Catalog index {index} out of range (0-{len(catalogs) - 1})."
|
|
)
|
|
|
|
removed = catalogs.pop(index)
|
|
data["catalogs"] = catalogs
|
|
|
|
with open(config_path, "w", encoding="utf-8") as f:
|
|
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
|
|
|
if isinstance(removed, dict):
|
|
return removed.get("name", f"catalog-{index + 1}")
|
|
return f"catalog-{index + 1}"
|