mirror of
https://github.com/github/spec-kit.git
synced 2026-07-04 04:45:43 +08:00
* Initial plan * feat: add authentication provider registry (GitHub + Azure DevOps) Agent-Logs-Url: https://github.com/github/spec-kit/sessions/da7ecfd0-e1c9-48dc-b692-27be0879e976 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * feat: add try-each-provider HTTP helper and wire all catalog fetches through auth registry - Add authentication/http.py with open_url() that tries each configured provider in registry order, falling through on 401/403 to the next, and finally to unauthenticated - Add build_request() for one-shot request construction - Add configured_providers() to registry __init__ - Remove api_base_url() from AuthProvider ABC (unused) - Remove hosts attribute from providers (no host matching) - Replace _github_http.py usage in ExtensionCatalog and PresetCatalog - Wire IntegrationCatalog and WorkflowCatalog through open_url (were unauthenticated) - Wire _fetch_latest_release_tag() through open_url - Wire all inline --from-url downloads through open_url - Fix unused stub variable flagged by code-quality bot - 49 auth tests (positive + negative), 1805 total tests passing * fix: address review — fix stale docstrings, restore Accept header, add extra_headers to open_url - Fix _open_url() docstrings in extensions.py and presets.py that incorrectly claimed redirect stripping behavior - Add extra_headers parameter to open_url() so callers can pass additional headers (e.g. Accept) that persist across retries - Restore Accept: application/vnd.github+json header in _fetch_latest_release_tag() via extra_headers * feat: config-driven opt-in auth via ~/.specify/auth.json Security-first redesign: no credentials are sent unless the user explicitly creates ~/.specify/auth.json mapping hosts to providers. - Add authentication/config.py: loads and validates auth.json with host-to-provider mappings, supports token/token_env/azure-ad/azure-cli - Refactor AuthProvider ABC: auth_headers(token, scheme) + resolve_token(entry) - Refactor GitHubAuth: bearer scheme only, token from config entry - Refactor AzureDevOpsAuth: 4 schemes (basic-pat, bearer, azure-cli, azure-ad) with dynamic token acquisition for azure-cli and azure-ad - Rewrite authentication/http.py: host matching, redirect stripping, provider fallthrough on 401/403, unauthenticated fallback - Add docs/reference/authentication.md with full reference and template - 1823 tests passing (67 auth-specific) * fix: address review — unused imports, host normalization, provider+scheme validation, security hardening - Remove unused imports (os, field, Any) in config.py - Normalize hosts during load (strip + lowercase) - Validate token/token_env are non-empty strings during load - Validate provider+scheme compatibility during load - Fix extra_headers order: auth headers applied last, cannot be overridden - Remove unused 'tried' variable in http.py - Warn (once) on malformed auth.json instead of silent fallback - URL-encode OAuth2 client credentials body in azure_devops.py - Update 403 message to mention auth.json configuration - Fix registry leak in test_register_duplicate (try/finally) - Fix import style consistency in test_authentication.py - Add azure-cli and azure-ad token acquisition tests (mock subprocess/urlopen) - Add autouse fixture to isolate upgrade tests from real auth.json - 1829 tests passing * fix: reject unknown providers, validate azure-ad fields, strip Authorization from extra_headers - Reject unknown provider keys during auth.json load with clear error message - Validate azure-ad tenant_id/client_id/client_secret_env as non-empty strings - Strip Authorization from extra_headers in both build_request and open_url to prevent accidental or intentional bypass of provider-configured auth - Add tests for unknown provider and incompatible scheme validation - 1831 tests passing * fix: extract shared auth test helpers, global config isolation, align docstring - Move _inject_github_config / make_github_auth_entry to tests/auth_helpers.py to eliminate duplication across test_extensions, test_presets, test_upgrade - Move auth config isolation fixture to global conftest.py (autouse) so ALL tests are isolated from ~/.specify/auth.json, not just test_upgrade - Align load_auth_config docstring with actual behavior: ValueError may be caught by higher-level HTTP helpers that warn and continue unauthenticated - 1831 tests passing * fix: preserve auth header across multi-hop redirect chains - Read Authorization from both headers and unredirected_hdrs in _StripAuthOnRedirect to survive multi-hop chains within allowed hosts - Add test_multi_hop_redirect_within_hosts_preserves_auth - 1832 tests passing * fix: use resolved config path in warning/error messages and patch build_opener in no-network test Agent-Logs-Url: https://github.com/github/spec-kit/sessions/86df9557-54f1-4fe4-a25f-9501cb2356cf Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * fix: assert full resolved config path in rate-limit output test Agent-Logs-Url: https://github.com/github/spec-kit/sessions/86df9557-54f1-4fe4-a25f-9501cb2356cf Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * fix: close HTTPError on 401/403, remove _VALID_AUTH_SCHEMES, catch TimeoutExpired, skip POSIX test on Windows, remove unused import Agent-Logs-Url: https://github.com/github/spec-kit/sessions/a1e29737-dd6e-4287-96c1-509e0c96fb21 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * fix: use stable ~/.specify/auth.json in rate-limit message, skip POSIX permission check on Windows Agent-Logs-Url: https://github.com/github/spec-kit/sessions/4636bcdb-87ae-45d6-9545-a40e4effd617 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * fix: validate host patterns, cache auth config per-process Agent-Logs-Url: https://github.com/github/spec-kit/sessions/889b58a7-7f8c-47e2-8056-931ebcc671cc Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * fix: clarify _is_valid_host_pattern docstring, clean up test sentinel type Agent-Logs-Url: https://github.com/github/spec-kit/sessions/889b58a7-7f8c-47e2-8056-931ebcc671cc Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * fix: improve _is_valid_host_pattern docstring and test observability Agent-Logs-Url: https://github.com/github/spec-kit/sessions/889b58a7-7f8c-47e2-8056-931ebcc671cc Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
541 lines
20 KiB
Python
541 lines
20 KiB
Python
"""Workflow catalog — discovery, install, and management of workflows.
|
|
|
|
Mirrors the existing extension/preset catalog pattern with:
|
|
- Multi-catalog stack (env var → project → user → built-in)
|
|
- SHA256-hashed per-URL caching with 1-hour TTL
|
|
- Workflow registry for installed workflow tracking
|
|
- Search across all configured catalog sources
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Errors
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class WorkflowCatalogError(Exception):
|
|
"""Base error for workflow catalog operations."""
|
|
|
|
|
|
class WorkflowValidationError(WorkflowCatalogError):
|
|
"""Validation error for catalog config or workflow data."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CatalogEntry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class WorkflowCatalogEntry:
|
|
"""Represents a single catalog source in the catalog stack."""
|
|
|
|
url: str
|
|
name: str
|
|
priority: int
|
|
install_allowed: bool
|
|
description: str = ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WorkflowRegistry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class WorkflowRegistry:
|
|
"""Manages the registry of installed workflows.
|
|
|
|
Tracks installed workflows and their metadata in
|
|
``.specify/workflows/workflow-registry.json``.
|
|
"""
|
|
|
|
REGISTRY_FILE = "workflow-registry.json"
|
|
SCHEMA_VERSION = "1.0"
|
|
|
|
def __init__(self, project_root: Path) -> None:
|
|
self.project_root = project_root
|
|
self.workflows_dir = project_root / ".specify" / "workflows"
|
|
self.registry_path = self.workflows_dir / self.REGISTRY_FILE
|
|
self.data = self._load()
|
|
|
|
def _load(self) -> dict[str, Any]:
|
|
"""Load registry from disk or create default."""
|
|
if self.registry_path.exists():
|
|
try:
|
|
with open(self.registry_path, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, ValueError):
|
|
# Corrupted registry file — reset to default
|
|
return {"schema_version": self.SCHEMA_VERSION, "workflows": {}}
|
|
return {"schema_version": self.SCHEMA_VERSION, "workflows": {}}
|
|
|
|
def save(self) -> None:
|
|
"""Persist registry to disk."""
|
|
self.workflows_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(self.registry_path, "w", encoding="utf-8") as f:
|
|
json.dump(self.data, f, indent=2)
|
|
|
|
def add(self, workflow_id: str, metadata: dict[str, Any]) -> None:
|
|
"""Add or update an installed workflow entry."""
|
|
from datetime import datetime, timezone
|
|
|
|
existing = self.data["workflows"].get(workflow_id, {})
|
|
metadata["installed_at"] = existing.get(
|
|
"installed_at", datetime.now(timezone.utc).isoformat()
|
|
)
|
|
metadata["updated_at"] = datetime.now(timezone.utc).isoformat()
|
|
self.data["workflows"][workflow_id] = metadata
|
|
self.save()
|
|
|
|
def remove(self, workflow_id: str) -> bool:
|
|
"""Remove an installed workflow entry. Returns True if found."""
|
|
if workflow_id in self.data["workflows"]:
|
|
del self.data["workflows"][workflow_id]
|
|
self.save()
|
|
return True
|
|
return False
|
|
|
|
def get(self, workflow_id: str) -> dict[str, Any] | None:
|
|
"""Get metadata for an installed workflow."""
|
|
return self.data["workflows"].get(workflow_id)
|
|
|
|
def list(self) -> dict[str, dict[str, Any]]:
|
|
"""Return all installed workflows."""
|
|
return dict(self.data["workflows"])
|
|
|
|
def is_installed(self, workflow_id: str) -> bool:
|
|
"""Check if a workflow is installed."""
|
|
return workflow_id in self.data["workflows"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WorkflowCatalog
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class WorkflowCatalog:
|
|
"""Manages workflow catalog fetching, caching, and searching.
|
|
|
|
Resolution order for catalog sources:
|
|
1. ``SPECKIT_WORKFLOW_CATALOG_URL`` env var (overrides all)
|
|
2. Project-level ``.specify/workflow-catalogs.yml``
|
|
3. User-level ``~/.specify/workflow-catalogs.yml``
|
|
4. Built-in defaults (official + community)
|
|
"""
|
|
|
|
DEFAULT_CATALOG_URL = (
|
|
"https://raw.githubusercontent.com/github/spec-kit/main/"
|
|
"workflows/catalog.json"
|
|
)
|
|
COMMUNITY_CATALOG_URL = (
|
|
"https://raw.githubusercontent.com/github/spec-kit/main/"
|
|
"workflows/catalog.community.json"
|
|
)
|
|
CACHE_DURATION = 3600 # 1 hour
|
|
|
|
def __init__(self, project_root: Path) -> None:
|
|
self.project_root = project_root
|
|
self.workflows_dir = project_root / ".specify" / "workflows"
|
|
self.cache_dir = self.workflows_dir / ".cache"
|
|
|
|
# -- Catalog resolution -----------------------------------------------
|
|
|
|
def _validate_catalog_url(self, url: str) -> None:
|
|
"""Validate that a catalog URL uses HTTPS (localhost HTTP allowed)."""
|
|
from urllib.parse import urlparse
|
|
|
|
parsed = urlparse(url)
|
|
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
|
|
if parsed.scheme != "https" and not (
|
|
parsed.scheme == "http" and is_localhost
|
|
):
|
|
raise WorkflowValidationError(
|
|
f"Catalog URL must use HTTPS (got {parsed.scheme}://). "
|
|
"HTTP is only allowed for localhost."
|
|
)
|
|
if not parsed.netloc:
|
|
raise WorkflowValidationError(
|
|
"Catalog URL must be a valid URL with a host."
|
|
)
|
|
|
|
def _load_catalog_config(
|
|
self, config_path: Path
|
|
) -> list[WorkflowCatalogEntry] | None:
|
|
"""Load catalog stack configuration from a YAML file."""
|
|
if not config_path.exists():
|
|
return None
|
|
try:
|
|
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
|
|
except (yaml.YAMLError, OSError, UnicodeError) as exc:
|
|
raise WorkflowValidationError(
|
|
f"Failed to read catalog config {config_path}: {exc}"
|
|
)
|
|
catalogs_data = data.get("catalogs", [])
|
|
if not catalogs_data:
|
|
# Empty catalogs list (e.g. after removing last entry)
|
|
# is valid — fall back to built-in defaults.
|
|
return None
|
|
if not isinstance(catalogs_data, list):
|
|
raise WorkflowValidationError(
|
|
f"Invalid catalog config: 'catalogs' must be a list, "
|
|
f"got {type(catalogs_data).__name__}"
|
|
)
|
|
|
|
entries: list[WorkflowCatalogEntry] = []
|
|
for idx, item in enumerate(catalogs_data):
|
|
if not isinstance(item, dict):
|
|
raise WorkflowValidationError(
|
|
f"Invalid catalog entry at index {idx}: "
|
|
f"expected a mapping, got {type(item).__name__}"
|
|
)
|
|
url = str(item.get("url", "")).strip()
|
|
if not url:
|
|
continue
|
|
self._validate_catalog_url(url)
|
|
try:
|
|
priority = int(item.get("priority", idx + 1))
|
|
except (TypeError, ValueError):
|
|
raise WorkflowValidationError(
|
|
f"Invalid priority for catalog "
|
|
f"'{item.get('name', idx + 1)}': "
|
|
f"expected integer, got {item.get('priority')!r}"
|
|
)
|
|
raw_install = item.get("install_allowed", False)
|
|
if isinstance(raw_install, str):
|
|
install_allowed = raw_install.strip().lower() in (
|
|
"true",
|
|
"yes",
|
|
"1",
|
|
)
|
|
else:
|
|
install_allowed = bool(raw_install)
|
|
entries.append(
|
|
WorkflowCatalogEntry(
|
|
url=url,
|
|
name=str(item.get("name", f"catalog-{idx + 1}")),
|
|
priority=priority,
|
|
install_allowed=install_allowed,
|
|
description=str(item.get("description", "")),
|
|
)
|
|
)
|
|
entries.sort(key=lambda e: e.priority)
|
|
if not entries:
|
|
raise WorkflowValidationError(
|
|
f"Catalog config {config_path} contains {len(catalogs_data)} "
|
|
f"entries but none have valid URLs."
|
|
)
|
|
return entries
|
|
|
|
def get_active_catalogs(self) -> list[WorkflowCatalogEntry]:
|
|
"""Get the ordered list of active catalogs."""
|
|
# 1. Environment variable override
|
|
env_url = os.environ.get("SPECKIT_WORKFLOW_CATALOG_URL", "").strip()
|
|
if env_url:
|
|
self._validate_catalog_url(env_url)
|
|
return [
|
|
WorkflowCatalogEntry(
|
|
url=env_url,
|
|
name="env-override",
|
|
priority=1,
|
|
install_allowed=True,
|
|
description="From SPECKIT_WORKFLOW_CATALOG_URL",
|
|
)
|
|
]
|
|
|
|
# 2. Project-level config
|
|
project_config = self.project_root / ".specify" / "workflow-catalogs.yml"
|
|
project_entries = self._load_catalog_config(project_config)
|
|
if project_entries is not None:
|
|
return project_entries
|
|
|
|
# 3. User-level config
|
|
home = Path.home()
|
|
user_config = home / ".specify" / "workflow-catalogs.yml"
|
|
user_entries = self._load_catalog_config(user_config)
|
|
if user_entries is not None:
|
|
return user_entries
|
|
|
|
# 4. Built-in defaults
|
|
return [
|
|
WorkflowCatalogEntry(
|
|
url=self.DEFAULT_CATALOG_URL,
|
|
name="default",
|
|
priority=1,
|
|
install_allowed=True,
|
|
description="Official workflows",
|
|
),
|
|
WorkflowCatalogEntry(
|
|
url=self.COMMUNITY_CATALOG_URL,
|
|
name="community",
|
|
priority=2,
|
|
install_allowed=False,
|
|
description="Community-contributed workflows (discovery only)",
|
|
),
|
|
]
|
|
|
|
# -- Caching ----------------------------------------------------------
|
|
|
|
def _get_cache_paths(self, url: str) -> tuple[Path, Path]:
|
|
"""Get cache file paths for a URL (hash-based)."""
|
|
url_hash = hashlib.sha256(url.encode()).hexdigest()[:16]
|
|
cache_file = self.cache_dir / f"workflow-catalog-{url_hash}.json"
|
|
meta_file = self.cache_dir / f"workflow-catalog-{url_hash}-meta.json"
|
|
return cache_file, meta_file
|
|
|
|
def _is_url_cache_valid(self, url: str) -> bool:
|
|
"""Check if cached data for a URL is still fresh."""
|
|
_, meta_file = self._get_cache_paths(url)
|
|
if not meta_file.exists():
|
|
return False
|
|
try:
|
|
with open(meta_file, encoding="utf-8") as f:
|
|
meta = json.load(f)
|
|
fetched_at = meta.get("fetched_at", 0)
|
|
return (time.time() - fetched_at) < self.CACHE_DURATION
|
|
except (json.JSONDecodeError, OSError):
|
|
return False
|
|
|
|
def _fetch_single_catalog(
|
|
self, entry: WorkflowCatalogEntry, force_refresh: bool = False
|
|
) -> dict[str, Any]:
|
|
"""Fetch a single catalog, using cache when possible."""
|
|
cache_file, meta_file = self._get_cache_paths(entry.url)
|
|
|
|
if not force_refresh and self._is_url_cache_valid(entry.url):
|
|
try:
|
|
with open(cache_file, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
# Fetch from URL — validate scheme before opening and after redirects
|
|
from urllib.parse import urlparse
|
|
from specify_cli.authentication.http import open_url as _open_url
|
|
|
|
def _validate_catalog_url(url: str) -> None:
|
|
parsed = urlparse(url)
|
|
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
|
|
if parsed.scheme != "https" and not (
|
|
parsed.scheme == "http" and is_localhost
|
|
):
|
|
raise WorkflowCatalogError(
|
|
f"Refusing to fetch catalog from non-HTTPS URL: {url}"
|
|
)
|
|
|
|
_validate_catalog_url(entry.url)
|
|
|
|
try:
|
|
with _open_url(entry.url, timeout=30) as resp:
|
|
_validate_catalog_url(resp.geturl())
|
|
data = json.loads(resp.read().decode("utf-8"))
|
|
except Exception as exc:
|
|
# Fall back to cache if available
|
|
if cache_file.exists():
|
|
try:
|
|
with open(cache_file, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, ValueError, OSError):
|
|
pass
|
|
raise WorkflowCatalogError(
|
|
f"Failed to fetch catalog from {entry.url}: {exc}"
|
|
) from exc
|
|
|
|
if not isinstance(data, dict):
|
|
raise WorkflowCatalogError(
|
|
f"Catalog from {entry.url} is not a valid JSON object."
|
|
)
|
|
|
|
# Write cache
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(cache_file, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2)
|
|
with open(meta_file, "w", encoding="utf-8") as f:
|
|
json.dump({"url": entry.url, "fetched_at": time.time()}, f)
|
|
|
|
return data
|
|
|
|
def _get_merged_workflows(
|
|
self, force_refresh: bool = False
|
|
) -> dict[str, dict[str, Any]]:
|
|
"""Merge workflows from all active catalogs (lower priority number wins)."""
|
|
catalogs = self.get_active_catalogs()
|
|
merged: dict[str, dict[str, Any]] = {}
|
|
fetch_errors = 0
|
|
|
|
# Process later/higher-numbered entries first so earlier/lower-numbered
|
|
# entries overwrite them on workflow ID conflicts.
|
|
for entry in reversed(catalogs):
|
|
try:
|
|
data = self._fetch_single_catalog(entry, force_refresh)
|
|
except WorkflowCatalogError:
|
|
fetch_errors += 1
|
|
continue
|
|
workflows = data.get("workflows", {})
|
|
# Handle both dict and list formats
|
|
if isinstance(workflows, dict):
|
|
for wf_id, wf_data in workflows.items():
|
|
if not isinstance(wf_data, dict):
|
|
continue
|
|
wf_data["_catalog_name"] = entry.name
|
|
wf_data["_install_allowed"] = entry.install_allowed
|
|
merged[wf_id] = wf_data
|
|
elif isinstance(workflows, list):
|
|
for wf_data in workflows:
|
|
if not isinstance(wf_data, dict):
|
|
continue
|
|
wf_id = wf_data.get("id", "")
|
|
if wf_id:
|
|
wf_data["_catalog_name"] = entry.name
|
|
wf_data["_install_allowed"] = entry.install_allowed
|
|
merged[wf_id] = wf_data
|
|
if fetch_errors == len(catalogs) and catalogs:
|
|
raise WorkflowCatalogError(
|
|
"All configured catalogs failed to fetch."
|
|
)
|
|
return merged
|
|
|
|
# -- Public API -------------------------------------------------------
|
|
|
|
def search(
|
|
self,
|
|
query: str | None = None,
|
|
tag: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Search workflows across all configured catalogs."""
|
|
merged = self._get_merged_workflows()
|
|
results: list[dict[str, Any]] = []
|
|
|
|
for wf_id, wf_data in merged.items():
|
|
wf_data.setdefault("id", wf_id)
|
|
if query:
|
|
q = query.lower()
|
|
searchable = " ".join(
|
|
[
|
|
wf_data.get("name", ""),
|
|
wf_data.get("description", ""),
|
|
wf_data.get("id", ""),
|
|
]
|
|
).lower()
|
|
if q not in searchable:
|
|
continue
|
|
if tag:
|
|
raw_tags = wf_data.get("tags", [])
|
|
tags = raw_tags if isinstance(raw_tags, list) else []
|
|
normalized_tags = [t.lower() for t in tags if isinstance(t, str)]
|
|
if tag.lower() not in normalized_tags:
|
|
continue
|
|
results.append(wf_data)
|
|
return results
|
|
|
|
def get_workflow_info(self, workflow_id: str) -> dict[str, Any] | None:
|
|
"""Get details for a specific workflow from the catalog."""
|
|
merged = self._get_merged_workflows()
|
|
wf = merged.get(workflow_id)
|
|
if wf:
|
|
wf.setdefault("id", workflow_id)
|
|
return wf
|
|
|
|
def get_catalog_configs(self) -> list[dict[str, Any]]:
|
|
"""Return current catalog configuration as a list of dicts."""
|
|
entries = self.get_active_catalogs()
|
|
return [
|
|
{
|
|
"name": e.name,
|
|
"url": e.url,
|
|
"priority": e.priority,
|
|
"install_allowed": e.install_allowed,
|
|
"description": e.description,
|
|
}
|
|
for e in entries
|
|
]
|
|
|
|
def add_catalog(self, url: str, name: str | None = None) -> None:
|
|
"""Add a catalog source to the project-level config."""
|
|
self._validate_catalog_url(url)
|
|
config_path = self.project_root / ".specify" / "workflow-catalogs.yml"
|
|
|
|
data: dict[str, Any] = {"catalogs": []}
|
|
if config_path.exists():
|
|
raw = yaml.safe_load(config_path.read_text(encoding="utf-8"))
|
|
if not isinstance(raw, dict):
|
|
raise WorkflowValidationError(
|
|
"Catalog config file is corrupted (expected a mapping)."
|
|
)
|
|
data = raw
|
|
|
|
catalogs = data.get("catalogs", [])
|
|
if not isinstance(catalogs, list):
|
|
raise WorkflowValidationError(
|
|
"Catalog config 'catalogs' must be a list."
|
|
)
|
|
# Check for duplicate URL (guard against non-dict entries)
|
|
for cat in catalogs:
|
|
if isinstance(cat, dict) and cat.get("url") == url:
|
|
raise WorkflowValidationError(
|
|
f"Catalog URL already configured: {url}"
|
|
)
|
|
|
|
# Derive priority from the highest existing priority + 1
|
|
max_priority = max(
|
|
(cat.get("priority", 0) for cat in catalogs if isinstance(cat, dict)),
|
|
default=0,
|
|
)
|
|
catalogs.append(
|
|
{
|
|
"name": name or f"catalog-{len(catalogs) + 1}",
|
|
"url": url,
|
|
"priority": max_priority + 1,
|
|
"install_allowed": True,
|
|
"description": "",
|
|
}
|
|
)
|
|
data["catalogs"] = catalogs
|
|
|
|
config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(config_path, "w", encoding="utf-8") as f:
|
|
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
|
|
|
def remove_catalog(self, index: int) -> str:
|
|
"""Remove a catalog source by index (0-based). Returns the removed name."""
|
|
config_path = self.project_root / ".specify" / "workflow-catalogs.yml"
|
|
if not config_path.exists():
|
|
raise WorkflowValidationError("No catalog config file found.")
|
|
|
|
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
|
|
if not isinstance(data, dict):
|
|
raise WorkflowValidationError(
|
|
"Catalog config file is corrupted (expected a mapping)."
|
|
)
|
|
catalogs = data.get("catalogs", [])
|
|
if not isinstance(catalogs, list):
|
|
raise WorkflowValidationError(
|
|
"Catalog config 'catalogs' must be a list."
|
|
)
|
|
|
|
if index < 0 or index >= len(catalogs):
|
|
raise WorkflowValidationError(
|
|
f"Catalog index {index} out of range (0-{len(catalogs) - 1})."
|
|
)
|
|
|
|
removed = catalogs.pop(index)
|
|
data["catalogs"] = catalogs
|
|
|
|
with open(config_path, "w", encoding="utf-8") as f:
|
|
yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
|
|
|
if isinstance(removed, dict):
|
|
return removed.get("name", f"catalog-{index + 1}")
|
|
return f"catalog-{index + 1}"
|