feat: Integration catalog — discovery, versioning, and community distribution (#2130)

* Initial plan

* feat: add integration catalog system with catalog files, IntegrationCatalog class, list --catalog flag, upgrade command, integration.yml descriptor, and tests

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/bbcd44e8-c69c-4735-adc1-bdf1ce109184

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>

* fix: address PR review feedback

- Replace empty except with cache cleanup in _fetch_single_catalog
- Log teardown failure warning instead of silent pass in upgrade
- Validate catalog_data and integrations are dicts before use
- Catch OSError/UnicodeError in IntegrationDescriptor._load
- Add isinstance checks for integration/requires/provides/commands
- Enforce semver (X.Y.Z) instead of PEP 440 for descriptor versions
- Fix docstring and CONTRIBUTING.md to match actual block-on-modified behavior
- Restore old manifest on upgrade failure for transactional safety

* refactor: address second round of PR review feedback

- Remove dead cache_file/cache_metadata_file attributes from IntegrationCatalog
- Deduplicate non-default catalog warning (show once per process)
- Anchor version regex to reject partial matches like 1.0.0beta
- Fix 'Preserved modified' message to 'Skipped' for accuracy
- Make upgrade transactional: install new files first, then remove stale
  old-only files, so a failed setup leaves old integration intact
- Update CONTRIBUTING.md: speckit_version validates presence only

* Potential fix for pull request finding 'Empty except'

Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>

* fix: address third round of PR review feedback

- Fix CONTRIBUTING.md JSON examples to show full catalog structure with
  schema_version and integrations wrapper
- Wrap cache writes in try/except OSError for read-only project dirs
- Validate _load_catalog_config YAML root is a dict
- Skip non-dict integ_data entries in merged catalog
- Normalize tags to list-of-strings before filtering/searching
- Add path traversal containment check for stale file deletion
- Clarify docstring: lower numeric priority = higher precedence

* fix: address fourth round of PR review feedback

- Remove unused _write_catalog helper from test file
- Fix comment: tests use monkeypatched urlopen, not file:// URLs
- Wrap cache unlink calls in OSError handler
- Add explicit encoding='utf-8' to all cache read_text/write_text calls
- Restore packaging.version.Version for descriptor version validation
  to align with extension/preset validators
- Add missing goose entry to integrations/catalog.json

* fix: remove unused Path import, add comment to empty except

* fix: validate descriptor root is dict, add shared infra to upgrade

- Add isinstance(self.data, dict) check at start of _validate() so
  non-mapping YAML roots raise IntegrationDescriptorError
- Run _install_shared_infra() and ensure_executable_scripts() in
  upgrade command to match install/switch behavior

* fix: address sixth round of PR review feedback

- Validate integration.id/name/version/description are strings
- Catch TypeError in pkg_version.Version() for non-string versions
- Swap validation order: check catalogs type before emptiness
- Isolate TestActiveCatalogs from user ~/.specify/ via monkeypatch

* fix: address seventh round of PR review feedback

- Update docs: version field uses PEP 440, not semver
- Harden search() against non-string author/name/description fields
- Validate requires.speckit_version is a non-empty string
- Validate command name/file are non-empty strings, file is safe relative path
- Handle stale symlinks in upgrade cleanup
- Document catalog configuration stack in README.md

* fix: validate script entries, remove destructive teardown from upgrade rollback

- Validate provides.scripts entries are non-empty strings with safe relative paths
- Remove teardown from upgrade rollback since setup overwrites in-place —
  teardown would delete files that were working before the upgrade

* fix: use consistent resolved root for stale-file cleanup paths

* fix: validate redirect URL and reject drive-qualified paths

- Validate final URL after redirects with _validate_catalog_url()
- Reject paths with Path.drive or Path.anchor for Windows safety
- Update FakeResponse mocks with geturl() method

* fix: fix docstring backticks, assert file modification in upgrade tests

* docs: clarify directory naming convention for hyphenated integration keys

* fix: correct key type hint, isolate all catalog tests from env

- Fix key parameter type to str | None (defaults to None)
- Add HOME/USERPROFILE monkeypatch and clear SPECKIT_INTEGRATION_CATALOG_URL
  in all TestCatalogFetch tests for full environment isolation

* fix: neutralize catalog table title, handle non-dict cache metadata

* fix: validate requires.tools entries in descriptor

* fix: show discovery-only status, clear metadata files in clear_cache

* fix: catch OSError/UnicodeError in cache read path

* refactor: reuse IntegrationManifest.uninstall for stale-file cleanup

* fix: normalize null tools to empty list in descriptor accessor

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
This commit is contained in:
Copilot
2026-04-16 08:59:32 -05:00
committed by GitHub
parent e0fd355dad
commit 282dd3da56
7 changed files with 1979 additions and 1 deletions

View File

@@ -1775,7 +1775,9 @@ def _resolve_script_type(project_root: Path, script_type: str | None) -> str:
@integration_app.command("list")
def integration_list():
def integration_list(
catalog: bool = typer.Option(False, "--catalog", help="Browse full catalog (built-in + community)"),
):
"""List available integrations and installed status."""
from .integrations import INTEGRATION_REGISTRY
@@ -1790,6 +1792,50 @@ def integration_list():
current = _read_integration_json(project_root)
installed_key = current.get("integration")
if catalog:
from .integrations.catalog import IntegrationCatalog, IntegrationCatalogError
ic = IntegrationCatalog(project_root)
try:
entries = ic.search()
except IntegrationCatalogError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(1)
if not entries:
console.print("[yellow]No integrations found in catalog.[/yellow]")
return
table = Table(title="Integration Catalog")
table.add_column("ID", style="cyan")
table.add_column("Name")
table.add_column("Version")
table.add_column("Source")
table.add_column("Status")
for entry in sorted(entries, key=lambda e: e["id"]):
eid = entry["id"]
cat_name = entry.get("_catalog_name", "")
install_allowed = entry.get("_install_allowed", True)
if eid == installed_key:
status = "[green]installed[/green]"
elif eid in INTEGRATION_REGISTRY:
status = "built-in"
elif install_allowed is False:
status = "discovery-only"
else:
status = ""
table.add_row(
eid,
entry.get("name", eid),
entry.get("version", ""),
cat_name,
status,
)
console.print(table)
return
table = Table(title="AI Agent Integrations")
table.add_column("Key", style="cyan")
table.add_column("Name")
@@ -2176,6 +2222,120 @@ def integration_switch(
console.print(f"\n[green]✓[/green] Switched to integration '{name}'")
@integration_app.command("upgrade")
def integration_upgrade(
key: str | None = typer.Argument(None, help="Integration key to upgrade (default: current integration)"),
force: bool = typer.Option(False, "--force", help="Force upgrade even if files are modified"),
script: str | None = typer.Option(None, "--script", help="Script type: sh or ps (default: from init-options.json or platform default)"),
integration_options: str | None = typer.Option(None, "--integration-options", help="Options for the integration"),
):
"""Upgrade an integration by reinstalling with diff-aware file handling.
Compares manifest hashes to detect locally modified files and
blocks the upgrade unless --force is used.
"""
from .integrations import get_integration
from .integrations.manifest import IntegrationManifest
project_root = Path.cwd()
specify_dir = project_root / ".specify"
if not specify_dir.exists():
console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)")
console.print("Run this command from a spec-kit project root")
raise typer.Exit(1)
current = _read_integration_json(project_root)
installed_key = current.get("integration")
if key is None:
if not installed_key:
console.print("[yellow]No integration is currently installed.[/yellow]")
raise typer.Exit(0)
key = installed_key
if installed_key and installed_key != key:
console.print(
f"[red]Error:[/red] Integration '{key}' is not the currently installed integration ('{installed_key}')."
)
console.print(f"Use [cyan]specify integration switch {key}[/cyan] instead.")
raise typer.Exit(1)
integration = get_integration(key)
if integration is None:
console.print(f"[red]Error:[/red] Unknown integration '{key}'")
raise typer.Exit(1)
manifest_path = project_root / ".specify" / "integrations" / f"{key}.manifest.json"
if not manifest_path.exists():
console.print(f"[yellow]No manifest found for integration '{key}'. Nothing to upgrade.[/yellow]")
console.print(f"Run [cyan]specify integration install {key}[/cyan] to perform a fresh install.")
raise typer.Exit(0)
try:
old_manifest = IntegrationManifest.load(key, project_root)
except (ValueError, FileNotFoundError) as exc:
console.print(f"[red]Error:[/red] Integration manifest for '{key}' is unreadable: {exc}")
raise typer.Exit(1)
# Detect modified files via manifest hashes
modified = old_manifest.check_modified()
if modified and not force:
console.print(f"[yellow]⚠[/yellow] {len(modified)} file(s) have been modified since installation:")
for rel in modified:
console.print(f" {rel}")
console.print("\nUse [cyan]--force[/cyan] to overwrite modified files, or resolve manually.")
raise typer.Exit(1)
selected_script = _resolve_script_type(project_root, script)
# Ensure shared infrastructure is present (safe to run unconditionally;
# _install_shared_infra merges missing files without overwriting).
_install_shared_infra(project_root, selected_script)
if os.name != "nt":
ensure_executable_scripts(project_root)
# Phase 1: Install new files (overwrites existing; old-only files remain)
console.print(f"Upgrading integration: [cyan]{key}[/cyan]")
new_manifest = IntegrationManifest(key, project_root, version=get_speckit_version())
parsed_options: dict[str, Any] | None = None
if integration_options:
parsed_options = _parse_integration_options(integration, integration_options)
try:
integration.setup(
project_root,
new_manifest,
parsed_options=parsed_options,
script_type=selected_script,
raw_options=integration_options,
)
new_manifest.save()
_write_integration_json(project_root, key, selected_script)
_update_init_options_for_integration(project_root, integration, script_type=selected_script)
except Exception as exc:
# Don't teardown — setup overwrites in-place, so teardown would
# delete files that were working before the upgrade. Just report.
console.print(f"[red]Error:[/red] Failed to upgrade integration: {exc}")
console.print("[yellow]The previous integration files may still be in place.[/yellow]")
raise typer.Exit(1)
# Phase 2: Remove stale files from old manifest that are not in the new one
old_files = old_manifest.files
new_files = new_manifest.files
stale_keys = set(old_files) - set(new_files)
if stale_keys:
stale_manifest = IntegrationManifest(key, project_root, version="stale-cleanup")
stale_manifest._files = {k: old_files[k] for k in stale_keys}
stale_removed, _ = stale_manifest.uninstall(project_root, force=True)
if stale_removed:
console.print(f" Removed {len(stale_removed)} stale file(s) from previous install")
name = (integration.config or {}).get("name", key)
console.print(f"\n[green]✓[/green] Integration '{name}' upgraded successfully")
# ===== Preset Commands =====

View File

@@ -0,0 +1,626 @@
"""Integration catalog — discovery, validation, and upgrade support.
Provides:
- ``IntegrationCatalogEntry`` — single catalog source metadata.
- ``IntegrationCatalog`` — fetches, caches, and searches integration
catalogs (built-in + community).
- ``IntegrationDescriptor`` — loads and validates ``integration.yml``.
"""
from __future__ import annotations
import hashlib
import json
import os
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from packaging import version as pkg_version
# ---------------------------------------------------------------------------
# Errors
# ---------------------------------------------------------------------------
class IntegrationCatalogError(Exception):
"""Raised when a catalog operation fails."""
class IntegrationDescriptorError(Exception):
"""Raised when an integration.yml descriptor is invalid."""
# ---------------------------------------------------------------------------
# IntegrationCatalogEntry
# ---------------------------------------------------------------------------
@dataclass
class IntegrationCatalogEntry:
"""Represents a single catalog source in the catalog stack."""
url: str
name: str
priority: int
install_allowed: bool
description: str = ""
# ---------------------------------------------------------------------------
# IntegrationCatalog
# ---------------------------------------------------------------------------
class IntegrationCatalog:
"""Manages integration catalog fetching, caching, and searching."""
DEFAULT_CATALOG_URL = (
"https://raw.githubusercontent.com/github/spec-kit/main/integrations/catalog.json"
)
COMMUNITY_CATALOG_URL = (
"https://raw.githubusercontent.com/github/spec-kit/main/integrations/catalog.community.json"
)
CACHE_DURATION = 3600 # 1 hour
def __init__(self, project_root: Path) -> None:
self.project_root = project_root
self.cache_dir = project_root / ".specify" / "integrations" / ".cache"
# -- URL validation ---------------------------------------------------
@staticmethod
def _validate_catalog_url(url: str) -> None:
from urllib.parse import urlparse
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (parsed.scheme == "http" and is_localhost):
raise IntegrationCatalogError(
f"Catalog URL must use HTTPS (got {parsed.scheme}://). "
"HTTP is only allowed for localhost."
)
if not parsed.netloc:
raise IntegrationCatalogError(
"Catalog URL must be a valid URL with a host."
)
# -- Catalog stack ----------------------------------------------------
def _load_catalog_config(
self, config_path: Path
) -> Optional[List[IntegrationCatalogEntry]]:
"""Load catalog stack from a YAML file.
Returns None when the file does not exist.
Raises:
IntegrationCatalogError: on invalid content
"""
if not config_path.exists():
return None
try:
data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
except (yaml.YAMLError, OSError, UnicodeError) as exc:
raise IntegrationCatalogError(
f"Failed to read catalog config {config_path}: {exc}"
)
if not isinstance(data, dict):
raise IntegrationCatalogError(
f"Invalid catalog config {config_path}: expected a YAML mapping at the root"
)
catalogs_data = data.get("catalogs", [])
if not isinstance(catalogs_data, list):
raise IntegrationCatalogError(
f"Invalid catalog config: 'catalogs' must be a list, "
f"got {type(catalogs_data).__name__}"
)
if not catalogs_data:
raise IntegrationCatalogError(
f"Catalog config {config_path} exists but contains no 'catalogs' entries. "
f"Remove the file to use built-in defaults, or add valid catalog entries."
)
entries: List[IntegrationCatalogEntry] = []
skipped: List[int] = []
for idx, item in enumerate(catalogs_data):
if not isinstance(item, dict):
raise IntegrationCatalogError(
f"Invalid catalog entry at index {idx}: "
f"expected a mapping, got {type(item).__name__}"
)
url = str(item.get("url", "")).strip()
if not url:
skipped.append(idx)
continue
self._validate_catalog_url(url)
try:
priority = int(item.get("priority", idx + 1))
except (TypeError, ValueError):
raise IntegrationCatalogError(
f"Invalid priority for catalog '{item.get('name', idx + 1)}': "
f"expected integer, got {item.get('priority')!r}"
)
raw_install = item.get("install_allowed", False)
if isinstance(raw_install, str):
install_allowed = raw_install.strip().lower() in ("true", "yes", "1")
else:
install_allowed = bool(raw_install)
entries.append(
IntegrationCatalogEntry(
url=url,
name=str(item.get("name", f"catalog-{idx + 1}")),
priority=priority,
install_allowed=install_allowed,
description=str(item.get("description", "")),
)
)
entries.sort(key=lambda e: e.priority)
if not entries:
raise IntegrationCatalogError(
f"Catalog config {config_path} contains {len(catalogs_data)} "
f"entries but none have valid URLs (entries at indices {skipped} "
f"were skipped). Each catalog entry must have a 'url' field."
)
return entries
def get_active_catalogs(self) -> List[IntegrationCatalogEntry]:
"""Return the ordered list of active integration catalogs.
Resolution:
1. ``SPECKIT_INTEGRATION_CATALOG_URL`` env var
2. Project ``.specify/integration-catalogs.yml``
3. User ``~/.specify/integration-catalogs.yml``
4. Built-in defaults (built-in + community)
"""
import sys
env_value = os.environ.get("SPECKIT_INTEGRATION_CATALOG_URL", "").strip()
if env_value:
self._validate_catalog_url(env_value)
if env_value != self.DEFAULT_CATALOG_URL:
if not getattr(self, "_non_default_catalog_warning_shown", False):
print(
"Warning: Using non-default integration catalog. "
"Only use catalogs from sources you trust.",
file=sys.stderr,
)
self._non_default_catalog_warning_shown = True
return [
IntegrationCatalogEntry(
url=env_value,
name="custom",
priority=1,
install_allowed=True,
description="Custom catalog via SPECKIT_INTEGRATION_CATALOG_URL",
)
]
project_cfg = self.project_root / ".specify" / "integration-catalogs.yml"
catalogs = self._load_catalog_config(project_cfg)
if catalogs is not None:
return catalogs
user_cfg = Path.home() / ".specify" / "integration-catalogs.yml"
catalogs = self._load_catalog_config(user_cfg)
if catalogs is not None:
return catalogs
return [
IntegrationCatalogEntry(
url=self.DEFAULT_CATALOG_URL,
name="default",
priority=1,
install_allowed=True,
description="Built-in catalog of installable integrations",
),
IntegrationCatalogEntry(
url=self.COMMUNITY_CATALOG_URL,
name="community",
priority=2,
install_allowed=False,
description="Community-contributed integrations (discovery only)",
),
]
# -- Fetching ---------------------------------------------------------
def _fetch_single_catalog(
self,
entry: IntegrationCatalogEntry,
force_refresh: bool = False,
) -> Dict[str, Any]:
"""Fetch one catalog, with per-URL caching."""
import urllib.error
import urllib.request
url_hash = hashlib.sha256(entry.url.encode()).hexdigest()[:16]
cache_file = self.cache_dir / f"catalog-{url_hash}.json"
cache_meta = self.cache_dir / f"catalog-{url_hash}-metadata.json"
if not force_refresh and cache_file.exists() and cache_meta.exists():
try:
meta = json.loads(cache_meta.read_text(encoding="utf-8"))
cached_at = datetime.fromisoformat(meta.get("cached_at", ""))
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - cached_at).total_seconds()
if age < self.CACHE_DURATION:
return json.loads(cache_file.read_text(encoding="utf-8"))
except (json.JSONDecodeError, ValueError, KeyError, TypeError, AttributeError, OSError, UnicodeError):
# Cache is invalid or stale metadata; delete and refetch from source.
try:
cache_file.unlink(missing_ok=True)
cache_meta.unlink(missing_ok=True)
except OSError:
pass # Cache cleanup is best-effort; ignore deletion failures.
try:
with urllib.request.urlopen(entry.url, timeout=10) as resp:
# Validate final URL after redirects
final_url = resp.geturl()
if final_url != entry.url:
self._validate_catalog_url(final_url)
catalog_data = json.loads(resp.read())
if not isinstance(catalog_data, dict):
raise IntegrationCatalogError(
f"Invalid catalog format from {entry.url}: expected a JSON object"
)
if (
"schema_version" not in catalog_data
or "integrations" not in catalog_data
):
raise IntegrationCatalogError(
f"Invalid catalog format from {entry.url}"
)
if not isinstance(catalog_data.get("integrations"), dict):
raise IntegrationCatalogError(
f"Invalid catalog format from {entry.url}: 'integrations' must be a JSON object"
)
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
cache_file.write_text(json.dumps(catalog_data, indent=2), encoding="utf-8")
cache_meta.write_text(
json.dumps(
{
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": entry.url,
},
indent=2,
),
encoding="utf-8",
)
except OSError:
pass # Cache is best-effort; proceed with fetched data
return catalog_data
except urllib.error.URLError as exc:
raise IntegrationCatalogError(
f"Failed to fetch catalog from {entry.url}: {exc}"
)
except json.JSONDecodeError as exc:
raise IntegrationCatalogError(
f"Invalid JSON in catalog from {entry.url}: {exc}"
)
def _get_merged_integrations(
self, force_refresh: bool = False
) -> List[Dict[str, Any]]:
"""Fetch and merge integrations from all active catalogs.
Catalogs are processed in the order returned by
:meth:`get_active_catalogs`. On conflicts, the first catalog in that
order wins (lower numeric priority = higher precedence). Each dict is
annotated with ``_catalog_name`` and ``_install_allowed``.
"""
import sys
active = self.get_active_catalogs()
merged: Dict[str, Dict[str, Any]] = {}
any_success = False
for entry in active:
try:
data = self._fetch_single_catalog(entry, force_refresh)
any_success = True
except IntegrationCatalogError as exc:
print(
f"Warning: Could not fetch catalog '{entry.name}': {exc}",
file=sys.stderr,
)
continue
for integ_id, integ_data in data.get("integrations", {}).items():
if not isinstance(integ_data, dict):
continue
if integ_id not in merged:
merged[integ_id] = {
**integ_data,
"id": integ_id,
"_catalog_name": entry.name,
"_install_allowed": entry.install_allowed,
}
if not any_success and active:
raise IntegrationCatalogError(
"Failed to fetch any integration catalog"
)
return list(merged.values())
# -- Search / info ----------------------------------------------------
def search(
self,
query: Optional[str] = None,
tag: Optional[str] = None,
author: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Search catalogs for integrations matching the given filters."""
results: List[Dict[str, Any]] = []
for item in self._get_merged_integrations():
author_val = item.get("author", "")
if not isinstance(author_val, str):
author_val = str(author_val) if author_val is not None else ""
if author and author_val.lower() != author.lower():
continue
if tag:
raw_tags = item.get("tags", [])
tags_list = raw_tags if isinstance(raw_tags, list) else []
if tag.lower() not in [t.lower() for t in tags_list if isinstance(t, str)]:
continue
if query:
raw_tags = item.get("tags", [])
tags_list = raw_tags if isinstance(raw_tags, list) else []
name_val = item.get("name", "")
desc_val = item.get("description", "")
id_val = item.get("id", "")
haystack = " ".join(
[
str(name_val) if name_val else "",
str(desc_val) if desc_val else "",
str(id_val) if id_val else "",
]
+ [t for t in tags_list if isinstance(t, str)]
).lower()
if query.lower() not in haystack:
continue
results.append(item)
return results
def get_integration_info(
self, integration_id: str
) -> Optional[Dict[str, Any]]:
"""Return catalog metadata for a single integration, or None."""
for item in self._get_merged_integrations():
if item["id"] == integration_id:
return item
return None
# -- Cache management -------------------------------------------------
def clear_cache(self) -> None:
"""Remove all cached catalog files."""
if self.cache_dir.exists():
for pattern in ("catalog-*.json", "catalog-*-metadata.json"):
for f in self.cache_dir.glob(pattern):
f.unlink(missing_ok=True)
# ---------------------------------------------------------------------------
# IntegrationDescriptor (integration.yml)
# ---------------------------------------------------------------------------
class IntegrationDescriptor:
"""Loads and validates an ``integration.yml`` descriptor.
The descriptor mirrors ``extension.yml`` and ``preset.yml``::
schema_version: "1.0"
integration:
id: "my-agent"
name: "My Agent"
version: "1.0.0"
description: "Integration for My Agent"
author: "my-org"
requires:
speckit_version: ">=0.6.0"
tools: [...]
provides:
commands: [...]
scripts: [...]
"""
SCHEMA_VERSION = "1.0"
REQUIRED_TOP_LEVEL = ["schema_version", "integration", "requires", "provides"]
def __init__(self, descriptor_path: Path) -> None:
self.path = descriptor_path
self.data = self._load(descriptor_path)
self._validate()
# -- Loading ----------------------------------------------------------
@staticmethod
def _load(path: Path) -> dict:
try:
with open(path, "r", encoding="utf-8") as fh:
return yaml.safe_load(fh) or {}
except yaml.YAMLError as exc:
raise IntegrationDescriptorError(f"Invalid YAML in {path}: {exc}")
except FileNotFoundError:
raise IntegrationDescriptorError(f"Descriptor not found: {path}")
except (OSError, UnicodeError) as exc:
raise IntegrationDescriptorError(
f"Unable to read descriptor {path}: {exc}"
)
# -- Validation -------------------------------------------------------
def _validate(self) -> None:
if not isinstance(self.data, dict):
raise IntegrationDescriptorError(
f"Descriptor root must be a YAML mapping, got {type(self.data).__name__}"
)
for field in self.REQUIRED_TOP_LEVEL:
if field not in self.data:
raise IntegrationDescriptorError(
f"Missing required field: {field}"
)
if self.data["schema_version"] != self.SCHEMA_VERSION:
raise IntegrationDescriptorError(
f"Unsupported schema version: {self.data['schema_version']} "
f"(expected {self.SCHEMA_VERSION})"
)
integ = self.data["integration"]
if not isinstance(integ, dict):
raise IntegrationDescriptorError(
"'integration' must be a mapping"
)
for field in ("id", "name", "version", "description"):
if field not in integ:
raise IntegrationDescriptorError(
f"Missing integration.{field}"
)
if not isinstance(integ[field], str):
raise IntegrationDescriptorError(
f"integration.{field} must be a string, got {type(integ[field]).__name__}"
)
if not re.match(r"^[a-z0-9-]+$", integ["id"]):
raise IntegrationDescriptorError(
f"Invalid integration ID '{integ['id']}': "
"must be lowercase alphanumeric with hyphens only"
)
try:
pkg_version.Version(integ["version"])
except (pkg_version.InvalidVersion, TypeError):
raise IntegrationDescriptorError(
f"Invalid version '{integ['version']}'"
)
requires = self.data["requires"]
if not isinstance(requires, dict):
raise IntegrationDescriptorError(
"'requires' must be a mapping"
)
if "speckit_version" not in requires:
raise IntegrationDescriptorError(
"Missing requires.speckit_version"
)
if not isinstance(requires["speckit_version"], str) or not requires["speckit_version"].strip():
raise IntegrationDescriptorError(
"requires.speckit_version must be a non-empty string"
)
tools = requires.get("tools")
if tools is not None:
if not isinstance(tools, list):
raise IntegrationDescriptorError(
"requires.tools must be a list"
)
for tool in tools:
if not isinstance(tool, dict):
raise IntegrationDescriptorError(
"Each requires.tools entry must be a mapping"
)
tool_name = tool.get("name")
if not isinstance(tool_name, str) or not tool_name.strip():
raise IntegrationDescriptorError(
"requires.tools entry 'name' must be a non-empty string"
)
provides = self.data["provides"]
if not isinstance(provides, dict):
raise IntegrationDescriptorError(
"'provides' must be a mapping"
)
commands = provides.get("commands", [])
scripts = provides.get("scripts", [])
if "commands" in provides and not isinstance(commands, list):
raise IntegrationDescriptorError(
"Invalid provides.commands: expected a list"
)
if "scripts" in provides and not isinstance(scripts, list):
raise IntegrationDescriptorError(
"Invalid provides.scripts: expected a list"
)
if not commands and not scripts:
raise IntegrationDescriptorError(
"Integration must provide at least one command or script"
)
for cmd in commands:
if not isinstance(cmd, dict):
raise IntegrationDescriptorError(
"Each command entry must be a mapping"
)
if "name" not in cmd or "file" not in cmd:
raise IntegrationDescriptorError(
"Command entry missing 'name' or 'file'"
)
cmd_name = cmd["name"]
cmd_file = cmd["file"]
if not isinstance(cmd_name, str) or not cmd_name.strip():
raise IntegrationDescriptorError(
"Command entry 'name' must be a non-empty string"
)
if not isinstance(cmd_file, str) or not cmd_file.strip():
raise IntegrationDescriptorError(
"Command entry 'file' must be a non-empty string"
)
if os.path.isabs(cmd_file) or ".." in Path(cmd_file).parts or Path(cmd_file).drive or Path(cmd_file).anchor:
raise IntegrationDescriptorError(
f"Command entry 'file' must be a relative path without '..': {cmd_file}"
)
for script_entry in scripts:
if not isinstance(script_entry, str) or not script_entry.strip():
raise IntegrationDescriptorError(
"Script entry must be a non-empty string"
)
if os.path.isabs(script_entry) or ".." in Path(script_entry).parts or Path(script_entry).drive or Path(script_entry).anchor:
raise IntegrationDescriptorError(
f"Script entry must be a relative path without '..': {script_entry}"
)
# -- Property accessors -----------------------------------------------
@property
def id(self) -> str:
return self.data["integration"]["id"]
@property
def name(self) -> str:
return self.data["integration"]["name"]
@property
def version(self) -> str:
return self.data["integration"]["version"]
@property
def description(self) -> str:
return self.data["integration"]["description"]
@property
def requires_speckit_version(self) -> str:
return self.data["requires"]["speckit_version"]
@property
def commands(self) -> List[Dict[str, Any]]:
return self.data.get("provides", {}).get("commands", [])
@property
def scripts(self) -> List[str]:
return self.data.get("provides", {}).get("scripts", [])
@property
def tools(self) -> List[Dict[str, Any]]:
return self.data.get("requires", {}).get("tools") or []
def get_hash(self) -> str:
"""SHA-256 hash of the descriptor file."""
with open(self.path, "rb") as fh:
return f"sha256:{hashlib.sha256(fh.read()).hexdigest()}"