Files
github-spec-kit/src/specify_cli/integration_state.py
Pascal THUET f65d9f9382 feat(integration): add status reporting (#2674)
* feat(integration): add status reporting

* docs(integration): include status in query command docstring

* fix(integration): handle Windows extended-length paths in status containment

On Windows, os.readlink() (and sometimes Path.resolve()) return paths with
the \\?\ extended-length prefix. Comparing such a target against a plain
project root via Path.relative_to() spuriously fails, so an in-project
dangling symlink was classified as `invalid` instead of `missing` — failing
test_status_treats_dangling_symlink_as_missing and the windows-style variant
on the Windows CI runners.

Centralize the containment check in _is_within_project() and strip the
\\?\ / \\?\UNC\ prefix from both sides before relative_to(). Add portable
regression tests for the prefix-stripping helper and the containment contract.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* test(integration): restore top-level pytest import after rebase

A three-way merge / rebase onto main silently dropped the module-level
`import pytest` from test_integration_subcommand.py: main reorganized the
import block without it (using only a local `import pytest as _pytest`),
while this branch added top-level fixtures and `pytest.skip`/`pytest.raises`
usage. The overlapping import-hunk edits resolved by dropping the import,
breaking collection with `NameError: name 'pytest' is not defined` on every
runner. Re-add the import in the third-party group.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* test(integration): fix Windows UNC path assertion in status helper test

`test_strip_extended_length_prefix_normalizes_windows_paths` compared the
str() form of the helper's output against a hand-built string. On Windows,
pathlib renders a UNC root with a trailing separator (`\\server\share\`),
so the exact string match failed there (`\\server\share\` != `\\server\share`)
even though `_strip_extended_length_prefix` behaves correctly — the trailing
separator is irrelevant to the `relative_to` containment check it feeds.

Compare Path objects (semantic equality) instead of exact strings so the
assertion holds on both POSIX and Windows. No production code change needed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* fix(integration): make shared-manifest remediation specify --integration

The fallback `_manifest_suggestion` for the shared `speckit` manifest (used
when no usable default integration is recorded) suggested
`specify init --here --force`, which can trigger interactive integration
selection. For CI/agent consumers of `integration status`, surface an
explicit `--integration <key>` placeholder, matching the file's existing
`<key>` suggestion style.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-06-09 07:20:43 -05:00

253 lines
9.2 KiB
Python

"""State helpers for installed AI agent integrations."""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any
INTEGRATION_JSON = ".specify/integration.json"
INTEGRATION_STATE_SCHEMA = 1
@dataclass(frozen=True)
class IntegrationReadError:
"""Structured failure from :func:`try_read_integration_json`.
Callers map ``kind`` to whatever surface they need (loud CLI error,
silent fallback, etc.) without re-implementing the parse/validation logic.
"""
kind: str # "decode", "os", "not_object", "schema_too_new"
detail: str = ""
schema: int | None = None
def _read_integration_json_data(
project_root: Path,
) -> tuple[dict[str, Any] | None, IntegrationReadError | None]:
"""Read raw integration state without normalizing or raising.
Returns ``(data, None)`` when the JSON object is readable and supported,
``(None, None)`` when the file is absent, and ``(None, error)`` for parse,
schema, encoding, or filesystem failures.
"""
path = project_root / INTEGRATION_JSON
# Avoid Path.exists() / Path.is_file() as a pre-check: both return False
# on some OSErrors (e.g. permission errors during stat), which would
# silently treat an unreadable-but-present file as missing. Attempt the
# read directly and distinguish FileNotFoundError (genuinely absent) from
# other OSErrors (which become loud errors via the IntegrationReadError
# path).
try:
raw = path.read_text(encoding="utf-8")
except FileNotFoundError:
return None, None
except IsADirectoryError as exc:
return None, IntegrationReadError(
kind="os",
detail=f"{path} exists but is not a regular file: {exc}",
)
except UnicodeDecodeError as exc:
return None, IntegrationReadError(kind="decode", detail=str(exc))
except OSError as exc:
return None, IntegrationReadError(kind="os", detail=str(exc))
try:
data = json.loads(raw)
except json.JSONDecodeError as exc:
return None, IntegrationReadError(kind="decode", detail=str(exc))
if not isinstance(data, dict):
return None, IntegrationReadError(kind="not_object", detail=type(data).__name__)
schema = data.get("integration_state_schema")
if (
isinstance(schema, int)
and not isinstance(schema, bool)
and schema > INTEGRATION_STATE_SCHEMA
):
return None, IntegrationReadError(kind="schema_too_new", schema=schema)
return data, None
def try_read_integration_json(
project_root: Path,
) -> tuple[dict[str, Any] | None, IntegrationReadError | None]:
"""Parse ``.specify/integration.json`` without raising.
Returns ``(normalized_state, None)`` on success, ``(None, None)`` when the
file does not exist, or ``(None, error)`` for any parse / validation
failure. This helper delegates file I/O and raw JSON validation to
``_read_integration_json_data`` so callers that need raw state can share
the same low-level reader instead of duplicating parse logic.
"""
data, error = _read_integration_json_data(project_root)
if data is None:
return None, error
return normalize_integration_state(data), None
def try_read_integration_json_with_raw(
project_root: Path,
) -> tuple[dict[str, Any] | None, dict[str, Any] | None, IntegrationReadError | None]:
"""Parse ``integration.json`` and return normalized plus raw state.
Returns ``(normalized_state, raw_state, None)`` when the file is readable,
``(None, None, None)`` when it is absent, and ``(None, None, error)`` for
parse, schema, encoding, or filesystem failures.
"""
data, error = _read_integration_json_data(project_root)
if data is None:
return None, None, error
return normalize_integration_state(data), data, None
def clean_integration_key(key: Any) -> str | None:
"""Return a stripped integration key, or None for empty/non-string values."""
if not isinstance(key, str) or not key.strip():
return None
return key.strip()
def dedupe_integration_keys(keys: list[Any]) -> list[str]:
"""Return a de-duplicated list of non-empty integration keys."""
seen: set[str] = set()
deduped: list[str] = []
for key in keys:
clean = clean_integration_key(key)
if clean is None:
continue
if clean in seen:
continue
seen.add(clean)
deduped.append(clean)
return deduped
def normalize_integration_settings(settings: Any) -> dict[str, dict[str, Any]]:
"""Return JSON-safe per-integration runtime settings."""
if not isinstance(settings, dict):
return {}
normalized: dict[str, dict[str, Any]] = {}
for key, value in settings.items():
if not isinstance(key, str) or not key.strip() or not isinstance(value, dict):
continue
clean: dict[str, Any] = {}
script = value.get("script")
if isinstance(script, str) and script.strip():
clean["script"] = script.strip()
raw_options = value.get("raw_options")
if isinstance(raw_options, str):
clean["raw_options"] = raw_options
parsed_options = value.get("parsed_options")
if isinstance(parsed_options, dict):
clean["parsed_options"] = parsed_options
invoke_separator = value.get("invoke_separator")
if isinstance(invoke_separator, str) and invoke_separator.strip():
clean["invoke_separator"] = invoke_separator.strip()
if clean:
normalized[key.strip()] = clean
return normalized
def _normalized_integration_state_schema(value: Any) -> int:
if isinstance(value, int) and not isinstance(value, bool) and value > INTEGRATION_STATE_SCHEMA:
return value
return INTEGRATION_STATE_SCHEMA
def normalize_integration_state(data: dict[str, Any]) -> dict[str, Any]:
"""Normalize legacy and multi-install integration metadata."""
legacy_key = clean_integration_key(data.get("integration"))
default_key = clean_integration_key(data.get("default_integration")) or legacy_key
installed = data.get("installed_integrations")
installed_keys = dedupe_integration_keys(installed if isinstance(installed, list) else [])
if not default_key and installed_keys:
default_key = installed_keys[0]
if default_key and default_key not in installed_keys:
installed_keys.insert(0, default_key)
settings = normalize_integration_settings(data.get("integration_settings"))
normalized = dict(data)
normalized["integration_state_schema"] = _normalized_integration_state_schema(
data.get("integration_state_schema")
)
if default_key:
normalized["integration"] = default_key
normalized["default_integration"] = default_key
else:
normalized.pop("integration", None)
normalized.pop("default_integration", None)
normalized["installed_integrations"] = installed_keys
normalized["integration_settings"] = {
key: settings[key] for key in installed_keys if key in settings
}
return normalized
def default_integration_key(state: dict[str, Any]) -> str | None:
"""Return the default integration key from normalized state."""
key = state.get("default_integration") or state.get("integration")
return clean_integration_key(key)
def installed_integration_keys(state: dict[str, Any]) -> list[str]:
"""Return installed integration keys from normalized state."""
return dedupe_integration_keys(state.get("installed_integrations", []))
def integration_settings(state: dict[str, Any]) -> dict[str, dict[str, Any]]:
"""Return normalized per-integration settings from state."""
return normalize_integration_settings(state.get("integration_settings"))
def integration_setting(state: dict[str, Any], key: str) -> dict[str, Any]:
"""Return stored runtime settings for *key*."""
return dict(integration_settings(state).get(key, {}))
def write_integration_json(
project_root: Path,
*,
version: str,
integration_key: str | None,
installed_integrations: list[str] | None = None,
settings: dict[str, dict[str, Any]] | None = None,
) -> None:
"""Write ``.specify/integration.json`` with legacy-compatible state."""
dest = project_root / INTEGRATION_JSON
dest.parent.mkdir(parents=True, exist_ok=True)
integration_key = clean_integration_key(integration_key)
installed = dedupe_integration_keys(installed_integrations or [])
if integration_key and integration_key not in installed:
installed.insert(0, integration_key)
if not integration_key and installed:
integration_key = installed[0]
normalized_settings = normalize_integration_settings(settings or {})
normalized_settings = {
key: normalized_settings[key] for key in installed if key in normalized_settings
}
data: dict[str, Any] = {
"version": version,
"integration_state_schema": INTEGRATION_STATE_SCHEMA,
"installed_integrations": installed,
"integration_settings": normalized_settings,
}
if integration_key:
data["integration"] = integration_key
data["default_integration"] = integration_key
dest.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")