Compare commits

..

1 Commits

Author SHA1 Message Date
github-actions[bot]
38b800cde3 chore: bump version to 0.10.0 2026-06-09 11:19:45 +00:00
20 changed files with 153 additions and 3019 deletions

View File

@@ -2,20 +2,6 @@
<!-- insert new changelog below this comment -->
## [0.10.1] - 2026-06-09
### Changed
- Update DocGuard — CDD Enforcement extension to v0.25.1 (#2909)
- Update a11y-governance preset to v0.3.0 (#2867)
- docs: document spec persistence models (#2856)
- chore(catalog): bump Linear Integration to v0.3.0 (repo renamed to spec-kit-linear-sync) (#2893)
- chore: update DocGuard extension to v0.25.0 (#2707)
- chore: remove unused open_github_url/_StripAuthOnRedirect from _github_http.py (#2883)
- fix(catalogs): validate extension and preset catalog payload shape (#2621)
- feat(integration): add status reporting (#2674)
- chore: release 0.10.0, begin 0.10.1.dev0 development (#2904)
## [0.10.0] - 2026-06-09
### Changed

View File

@@ -44,7 +44,7 @@ The following community-contributed extensions are available in [`catalog.commun
| Conduct Extension | Orchestrates spec-kit phases via sub-agent delegation to reduce context pollution. | `process` | Read+Write | [spec-kit-conduct-ext](https://github.com/twbrandon7/spec-kit-conduct-ext) |
| Confluence Extension | Create a doc in Confluence summarizing the specifications and planning files | `integration` | Read+Write | [spec-kit-confluence](https://github.com/aaronrsun/spec-kit-confluence) |
| Cost Tracker | Track real LLM dollar cost across SDD workflows — per-feature budgets, per-integration comparison, and finance-ready exports | `visibility` | Read+Write | [spec-kit-cost](https://github.com/Quratulain-bilal/spec-kit-cost) |
| DocGuard — CDD Enforcement | Canonical-Driven Development enforcement. Validates, scores, and traces project documentation with automated checks, AI-driven workflows, and spec-kit hooks. One pinned runtime dependency; pure Node.js otherwise. | `docs` | Read+Write | [spec-kit-docguard](https://github.com/raccioly/docguard) |
| DocGuard — CDD Enforcement | Canonical-Driven Development enforcement. Validates, scores, and traces project documentation with automated checks, AI-driven workflows, and spec-kit hooks. Zero NPM runtime dependencies. | `docs` | Read+Write | [spec-kit-docguard](https://github.com/raccioly/docguard) |
| Extensify | Create and validate extensions and extension catalogs | `process` | Read+Write | [extensify](https://github.com/mnriem/spec-kit-extensions/tree/main/extensify) |
| Fix Findings | Automated analyze-fix-reanalyze loop that resolves spec findings until clean | `code` | Read+Write | [spec-kit-fix-findings](https://github.com/Quratulain-bilal/spec-kit-fix-findings) |
| FixIt Extension | Spec-aware bug fixing — maps bugs to spec artifacts, proposes a plan, applies minimal changes | `code` | Read+Write | [spec-kit-fixit](https://github.com/speckit-community/spec-kit-fixit) |
@@ -56,7 +56,7 @@ The following community-contributed extensions are available in [`catalog.commun
| Iterate | Iterate on spec documents with a two-phase define-and-apply workflow — refine specs mid-implementation and go straight back to building | `docs` | Read+Write | [spec-kit-iterate](https://github.com/imviancagrace/spec-kit-iterate) |
| Jira Integration | Create Jira Epics, Stories, and Issues from spec-kit specifications and task breakdowns with configurable hierarchy and custom field support | `integration` | Read+Write | [spec-kit-jira](https://github.com/mbachorik/spec-kit-jira) |
| Learning Extension | Generate educational guides from implementations and enhance clarifications with mentoring context | `docs` | Read+Write | [spec-kit-learn](https://github.com/imviancagrace/spec-kit-learn) |
| Linear Integration | Mirror spec-kit feature directories into Linear (filesystem → Linear, reconcile-based, unidirectional). | `integration` | Read+Write | [spec-kit-linear-sync](https://github.com/ashbrener/spec-kit-linear-sync) |
| Linear Integration | Mirror spec-kit feature directories into Linear (filesystem → Linear, reconcile-based, unidirectional). | `integration` | Read+Write | [spec-kit-linear](https://github.com/ashbrener/spec-kit-linear) |
| MAQA — Multi-Agent & Quality Assurance | Coordinator → feature → QA agent workflow with parallel worktree-based implementation. Language-agnostic. Auto-detects installed board plugins. Optional CI gate. | `process` | Read+Write | [spec-kit-maqa-ext](https://github.com/GenieRobot/spec-kit-maqa-ext) |
| MAQA Azure DevOps Integration | Azure DevOps Boards integration for MAQA — syncs User Stories and Task children as features progress | `integration` | Read+Write | [spec-kit-maqa-azure-devops](https://github.com/GenieRobot/spec-kit-maqa-azure-devops) |
| MAQA CI/CD Gate | Auto-detects GitHub Actions, CircleCI, GitLab CI, and Bitbucket Pipelines. Blocks QA handoff until pipeline is green. | `process` | Read+Write | [spec-kit-maqa-ci](https://github.com/GenieRobot/spec-kit-maqa-ci) |

View File

@@ -7,7 +7,7 @@ The following community-contributed presets customize how Spec Kit behaves — o
| Preset | Purpose | Provides | Requires | URL |
|--------|---------|----------|----------|-----|
| A11Y Governance | Adds WCAG 2.2 AA accessibility checks, bilingual DE/EN delivery, CEFR-B2 readability, CLI accessibility, inclusive-content guidance, and didactic inline-code-comment review | 10 templates, 3 commands | — | [spec-kit-preset-a11y-governance](https://github.com/hindermath/spec-kit-preset-a11y-governance) |
| A11Y Governance | Adds WCAG 2.2 AA accessibility checks, bilingual DE/EN delivery, CEFR-B2 readability, CLI accessibility, and inclusive-content guidance | 9 templates, 3 commands | — | [spec-kit-preset-a11y-governance](https://github.com/hindermath/spec-kit-preset-a11y-governance) |
| Agent Parity Governance | Keeps shared AI-agent instructions aligned and adds agent-neutral Spec Kit model-routing guidance across project-defined agent guidance surfaces | 9 templates, 3 commands | — | [spec-kit-preset-agent-parity-governance](https://github.com/hindermath/spec-kit-preset-agent-parity-governance) |
| AIDE In-Place Migration | Adapts the AIDE extension workflow for in-place technology migrations (X → Y pattern) — adds migration objectives, verification gates, knowledge documents, and behavioral equivalence criteria | 2 templates, 8 commands | AIDE extension | [spec-kit-presets](https://github.com/mnriem/spec-kit-presets) |
| Architecture Governance | Adds secure architecture governance: trust boundaries, threat modeling, STRIDE/CAPEC, S-ADRs, Zero Trust applicability, and OWASP SAMM | 11 templates, 3 commands | — | [spec-kit-preset-architecture-governance](https://github.com/hindermath/spec-kit-preset-architecture-governance) |

View File

@@ -11,11 +11,6 @@ Spec-Driven Development is a structured process that emphasizes:
- **Multi-step refinement** rather than one-shot code generation from prompts
- **Heavy reliance** on advanced AI model capabilities for specification interpretation
Spec Kit does not prescribe how teams preserve or mutate `spec.md`, `plan.md`,
and `tasks.md` after requirements change. See
[Spec Persistence Models](spec-persistence.md) for three common ways to manage
those artifacts over time.
## Development Phases
| Phase | Focus | Key Activities |

View File

@@ -1,107 +0,0 @@
# Spec Persistence Models
Spec Kit intentionally leaves teams in control of what happens to `spec.md`,
`plan.md`, and `tasks.md` after requirements change. The toolkit gives you a
repeatable workflow, but it does not force one artifact maintenance strategy.
This page names three common models so teams can make that choice explicit.
None is the default, and none is required by Spec Kit.
## Two Separate Questions
Spec-driven development has a temporal question: how long should the
specification matter? One
[overview of SDD tooling](https://martinfowler.com/articles/exploring-gen-ai/sdd-3-tools.html)
frames that lifecycle in three levels:
- **Spec-first**: write a spec before coding, then allow it to be discarded.
- **Spec-anchored**: keep the spec after implementation and use it for future
changes.
- **Spec-as-source**: treat the spec as the only human-edited source and
regenerate implementation artifacts from it.
Spec Kit also exposes a second question: what happens to the artifact set when
requirements change? The models below describe that mutation strategy.
## Flow-Back Spec
Use flow-back when `spec.md`, `plan.md`, `tasks.md`, and the implementation are
all allowed to inform each other.
In this model, edits can begin in any artifact. A developer might update
`tasks.md` during implementation, revise `plan.md` after a technical discovery,
or adjust `spec.md` after a product clarification. The team then reconciles the
artifact set manually so the final project history still makes sense.
Flow-back works well when:
- the team is small enough to notice and reconcile drift quickly
- implementation discoveries are expected to reshape the original plan
- speed matters more than preserving each intermediate decision as immutable
history
The main risk is silent divergence. If the team changes lower-level artifacts
without reflecting the decision back into `spec.md`, future contributors may
not know which artifact to trust.
## Flow-Forward Spec
Use flow-forward when each feature directory should remain a historical record.
In this model, completed artifacts are treated as immutable. When requirements
change, the team creates a new feature directory instead of mutating the
existing `spec.md`, `plan.md`, or `tasks.md`. The older directory remains useful
for audit, comparison, or explaining how the project reached its current state.
Flow-forward works well when:
- auditability and traceability matter
- features are well-scoped and rarely revisited in place
- the team wants a clear sequence of requirement changes over time
The main tradeoff is duplication. Related decisions can be spread across
multiple feature directories, so teams need naming, linking, or review habits
that make the lineage easy to follow.
## Living Spec
Use living spec when `spec.md` is the contract and the other artifacts are
derived from it.
In this model, teams update `spec.md` first and then regenerate or revise
`plan.md` and `tasks.md` from that source. The plan and task list are still
valuable, but they are treated as disposable derivations rather than permanent
sources of truth.
Living spec works well when:
- the product contract is stable enough to own the workflow
- the team is comfortable regenerating derived artifacts after spec changes
- consistency between requirements and implementation matters more than keeping
every intermediate plan intact
The main risk is losing useful implementation rationale if derived artifacts are
discarded without preserving important decisions elsewhere.
## Choosing a Model
The model is a team convention, not a CLI setting. A project can even use
different models in different areas, as long as contributors know which one
applies.
| Model | Mutation rule | Best fit | Watch out for |
|---|---|---|---|
| Flow-back spec | Edit any artifact, then reconcile | Fast iteration and close collaboration | Silent drift between artifacts |
| Flow-forward spec | Create a new feature directory for new requirements | Audit trails and historical clarity | Duplicate or fragmented context |
| Living spec | Edit `spec.md`; regenerate derived artifacts | Spec as contract | Lost rationale in regenerated files |
If your team has not chosen a model yet, start by answering two questions:
1. Should completed feature directories be historical records or editable work
areas?
2. Is `spec.md` the single source of truth, or are `plan.md` and `tasks.md`
allowed to become co-equal sources?
Once those answers are clear, document the convention in your project
constitution or team onboarding notes so future contributors know how to handle
changes.

View File

@@ -126,27 +126,6 @@ specify integration upgrade [<key>]
Reinstalls an installed integration with updated templates and commands (e.g., after upgrading Spec Kit). Defaults to the default integration; if a key is provided, it must be one of the installed integrations. Detects locally modified files and blocks the upgrade unless `--force` is used. Stale files from the previous install that are no longer needed are removed automatically. Shared templates stay aligned with the default integration even when upgrading a non-default integration.
## Report Integration Status
```bash
specify integration status
specify integration status --json
```
Reports the current project's integration status without changing files. The
status report includes the default integration, installed integrations,
multi-install safety, missing managed files, modified managed files, invalid
manifest paths, shared Spec Kit infrastructure health, unchecked manifests, and
the target integration for default-sensitive shared templates. The JSON form is
intended for CI and coding agents that need stable machine-readable status data;
it also reports the raw recorded integrations and the integration manifests that
were checked when state repair heuristics differ from the recorded file.
The command exits 0 when the report status is `ok` or `warning`; it exits 1
only when the report status is `error`. In JSON output, `multi_install_safe`
is `null` when no installed integration set can be evaluated, such as when the
integration state is missing, unreadable, lacks a valid recorded integration
list, or records no installed integrations.
## Integration-Specific Options
Some integrations accept additional options via `--integration-options`:

View File

@@ -41,8 +41,6 @@
items:
- name: What is SDD?
href: concepts/sdd.md
- name: Spec Persistence Models
href: concepts/spec-persistence.md
# Development workflows
- name: Development

View File

@@ -1,6 +1,6 @@
{
"schema_version": "1.0",
"updated_at": "2026-06-09T00:00:00Z",
"updated_at": "2026-06-08T00:00:00Z",
"catalog_url": "https://raw.githubusercontent.com/github/spec-kit/main/extensions/catalog.community.json",
"extensions": {
"aide": {
@@ -438,7 +438,7 @@
"updated_at": "2026-04-10T00:00:00Z"
},
"brownkit": {
"name": "BrownKit Brownfield Discovery for Spec-Kit",
"name": "BrownKit \u2014 Brownfield Discovery for Spec-Kit",
"id": "brownkit",
"description": "Evidence-driven capability discovery, security and QA risk assessment for existing codebases.",
"author": "Maksim Shautsou",
@@ -849,10 +849,10 @@
"docguard": {
"name": "DocGuard — CDD Enforcement",
"id": "docguard",
"description": "Canonical-Driven Development enforcement. Validates, scores, and traces project documentation with automated checks, AI-driven workflows, and spec-kit hooks. One pinned runtime dependency; pure Node.js otherwise.",
"description": "Canonical-Driven Development enforcement. Validates, scores, and traces project documentation with automated checks, AI-driven workflows, and spec-kit hooks. Zero NPM runtime dependencies.",
"author": "raccioly",
"version": "0.25.1",
"download_url": "https://github.com/raccioly/docguard/releases/download/v0.25.1/spec-kit-docguard-v0.25.1.zip",
"version": "0.9.11",
"download_url": "https://github.com/raccioly/docguard/releases/download/v0.9.11/spec-kit-docguard-v0.9.11.zip",
"repository": "https://github.com/raccioly/docguard",
"homepage": "https://www.npmjs.com/package/docguard-cli",
"documentation": "https://github.com/raccioly/docguard/blob/main/extensions/spec-kit-docguard/README.md",
@@ -886,7 +886,7 @@
"downloads": 0,
"stars": 0,
"created_at": "2026-03-13T00:00:00Z",
"updated_at": "2026-06-09T00:00:00Z"
"updated_at": "2026-03-18T18:53:31Z"
},
"doctor": {
"name": "Project Health Check",
@@ -1251,12 +1251,12 @@
"id": "linear",
"description": "Mirror spec-kit feature directories into Linear (filesystem → Linear, reconcile-based, unidirectional).",
"author": "Ash Brener",
"version": "0.3.0",
"download_url": "https://github.com/ashbrener/spec-kit-linear-sync/archive/refs/tags/v0.3.0.zip",
"repository": "https://github.com/ashbrener/spec-kit-linear-sync",
"homepage": "https://github.com/ashbrener/spec-kit-linear-sync",
"documentation": "https://github.com/ashbrener/spec-kit-linear-sync/blob/main/README.md",
"changelog": "https://github.com/ashbrener/spec-kit-linear-sync/releases",
"version": "0.2.0",
"download_url": "https://github.com/ashbrener/spec-kit-linear/archive/refs/tags/v0.2.0.zip",
"repository": "https://github.com/ashbrener/spec-kit-linear",
"homepage": "https://github.com/ashbrener/spec-kit-linear",
"documentation": "https://github.com/ashbrener/spec-kit-linear/blob/main/README.md",
"changelog": "https://github.com/ashbrener/spec-kit-linear/releases",
"license": "MIT",
"requires": {
"speckit_version": ">=0.1.0"
@@ -1277,7 +1277,7 @@
"downloads": 0,
"stars": 0,
"created_at": "2026-06-01T00:00:00Z",
"updated_at": "2026-06-08T00:00:00Z"
"updated_at": "2026-06-01T00:00:00Z"
},
"m365": {
"name": "Microsoft 365 Integration",
@@ -3607,4 +3607,4 @@
"updated_at": "2026-04-13T00:00:00Z"
}
}
}
}

View File

@@ -1,16 +1,16 @@
{
"schema_version": "1.0",
"updated_at": "2026-06-05T00:00:00Z",
"updated_at": "2026-06-03T00:00:00Z",
"catalog_url": "https://raw.githubusercontent.com/github/spec-kit/main/presets/catalog.community.json",
"presets": {
"a11y-governance": {
"name": "A11Y Governance",
"id": "a11y-governance",
"version": "0.3.0",
"description": "Adds accessibility, bilingual DE/EN delivery, CEFR-B2 readability, inclusive-content governance, and didactic inline-code-comment review to Spec Kit.",
"version": "0.2.0",
"description": "Adds accessibility, bilingual DE/EN delivery, CEFR-B2 readability, and inclusive-content governance to Spec Kit.",
"author": "Thorsten Hindermann",
"repository": "https://github.com/hindermath/spec-kit-preset-a11y-governance",
"download_url": "https://github.com/hindermath/spec-kit-preset-a11y-governance/archive/refs/tags/v0.3.0.zip",
"download_url": "https://github.com/hindermath/spec-kit-preset-a11y-governance/archive/refs/tags/v0.2.0.zip",
"homepage": "https://github.com/hindermath/spec-kit-preset-a11y-governance",
"documentation": "https://github.com/hindermath/spec-kit-preset-a11y-governance/blob/main/README.md",
"license": "MIT",
@@ -18,7 +18,7 @@
"speckit_version": ">=0.8.0"
},
"provides": {
"templates": 10,
"templates": 9,
"commands": 3
},
"tags": [
@@ -29,7 +29,7 @@
"inclusion"
],
"created_at": "2026-04-27T00:00:00Z",
"updated_at": "2026-06-05T00:00:00Z"
"updated_at": "2026-04-27T00:00:00Z"
},
"agent-parity-governance": {
"name": "Agent Parity Governance",

View File

@@ -1,6 +1,6 @@
[project]
name = "specify-cli"
version = "0.10.1"
version = "0.10.0"
description = "Specify CLI, part of GitHub Spec Kit. A tool to bootstrap your projects for Spec-Driven Development (SDD)."
requires-python = ">=3.11"
dependencies = [

View File

@@ -1,11 +1,9 @@
"""Shared GitHub HTTP request helpers.
"""Shared GitHub-authenticated HTTP helpers.
Provides ``build_github_request()`` for attaching GITHUB_TOKEN / GH_TOKEN
credentials to requests targeting GitHub-hosted domains, and
``resolve_github_release_asset_api_url()`` — used by extensions, presets,
and workflow URL resolution — to translate browser release-download URLs
into GitHub REST API asset URLs. Authenticated downloads themselves go
through the config-driven helpers in :mod:`specify_cli.authentication.http`.
Used by both ExtensionCatalog and PresetCatalog to attach
GITHUB_TOKEN / GH_TOKEN credentials to requests targeting
GitHub-hosted domains, while preventing token leakage to
third-party hosts on redirects.
"""
import os
@@ -56,6 +54,28 @@ def build_github_request(url: str) -> urllib.request.Request:
return urllib.request.Request(url, headers=headers)
class _StripAuthOnRedirect(urllib.request.HTTPRedirectHandler):
"""Redirect handler that drops the Authorization header when leaving GitHub.
Prevents token leakage to CDNs or other third-party hosts that GitHub
may redirect to (e.g. S3 for release asset downloads, objects.githubusercontent.com).
Auth is preserved as long as the redirect target remains within GITHUB_HOSTS.
"""
def redirect_request(self, req, fp, code, msg, headers, newurl):
original_auth = req.get_header("Authorization")
new_req = super().redirect_request(req, fp, code, msg, headers, newurl)
if new_req is not None:
hostname = (urlparse(newurl).hostname or "").lower()
if hostname in GITHUB_HOSTS:
if original_auth:
new_req.add_unredirected_header("Authorization", original_auth)
else:
new_req.headers.pop("Authorization", None)
new_req.unredirected_hdrs.pop("Authorization", None)
return new_req
def resolve_github_release_asset_api_url(
download_url: str,
open_url_fn: Callable,
@@ -127,3 +147,20 @@ def resolve_github_release_asset_api_url(
return str(asset["url"])
return None
def open_github_url(url: str, timeout: int = 10):
"""Open a URL with GitHub auth, stripping the header on cross-host redirects.
When the request carries an Authorization header, a custom redirect
handler drops that header if the redirect target is not a GitHub-owned
domain, preventing token leakage to CDNs or other third-party hosts
that GitHub may redirect to (e.g. S3 for release asset downloads).
"""
req = build_github_request(url)
if not req.get_header("Authorization"):
return urllib.request.urlopen(req, timeout=timeout)
opener = urllib.request.build_opener(_StripAuthOnRedirect)
return opener.open(req, timeout=timeout)

View File

@@ -1905,44 +1905,6 @@ class ExtensionCatalog(CatalogStackBase):
download_url, self._open_url, timeout=timeout
)
def _validate_catalog_payload(self, catalog_data: Any, url: str) -> None:
"""Validate a parsed catalog payload's shape.
Applied to both network-fetched and cache-loaded payloads so a
once-poisoned cache (older spec-kit version, manual edit, upstream
served a bad payload before the network-side guards were added)
cannot re-crash ``_get_merged_extensions`` on subsequent calls.
Checking only key presence would let a payload like
``{"extensions": []}`` or ``{"extensions": null}`` slip through
here and then crash with ``AttributeError: 'list' object has no
attribute 'items'`` deep inside ``_get_merged_extensions``. The
sibling integration catalog reader already guards both the root
object and the nested mapping (see ``integrations/catalog.py``);
the extension catalog must stay consistent so a malformed payload
surfaces as the user-facing ``Invalid catalog format`` error
instead of a raw Python traceback.
Args:
catalog_data: Parsed JSON payload from the catalog source.
url: Source URL — used in the error message so the user can
tell which catalog in a multi-catalog stack is malformed.
Raises:
ExtensionError: If the payload's shape is invalid.
"""
if not isinstance(catalog_data, dict):
raise ExtensionError(
f"Invalid catalog format from {url}: expected a JSON object"
)
if "schema_version" not in catalog_data or "extensions" not in catalog_data:
raise ExtensionError(f"Invalid catalog format from {url}")
if not isinstance(catalog_data.get("extensions"), dict):
raise ExtensionError(
f"Invalid catalog format from {url}: "
"'extensions' must be a JSON object"
)
def get_active_catalogs(self) -> List[CatalogEntry]:
"""Get the ordered list of active catalogs.
@@ -2058,51 +2020,21 @@ class ExtensionCatalog(CatalogStackBase):
is_valid = False
if not force_refresh and cache_file.exists() and cache_meta_file.exists():
try:
metadata = json.loads(
cache_meta_file.read_text(encoding="utf-8")
)
metadata = json.loads(cache_meta_file.read_text())
cached_at = datetime.fromisoformat(metadata.get("cached_at", ""))
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - cached_at).total_seconds()
is_valid = age < self.CACHE_DURATION
except (
json.JSONDecodeError,
OSError,
UnicodeError,
ValueError,
KeyError,
TypeError,
AttributeError,
):
# Cache validity is best-effort: invalid/missing metadata
# fields, an unreadable metadata file (permissions / disk),
# a wrongly-encoded metadata file (written by a tool using
# the system locale codec), or a metadata payload that
# parses to a non-mapping like ``[]`` or ``"oops"`` (so
# ``metadata.get(...)`` raises ``AttributeError``) all
# degrade to "cache invalid" so the caller falls through
# to a network refetch instead of crashing.
except (json.JSONDecodeError, ValueError, KeyError, TypeError):
# If metadata is invalid or missing expected fields, treat cache as invalid
pass
# Use cache if valid. A previously-cached payload must clear the
# same shape checks as a freshly-fetched one — otherwise a once-
# poisoned cache (older spec-kit version, manual edit, upstream
# served a bad payload before the network-side guards were added)
# would re-crash on every invocation despite the cache being
# "valid" by age. If validation fails on the cached read, fall
# through to the network fetch path so the cache gets refreshed.
# Use cache if valid
if is_valid:
try:
cached_data = json.loads(cache_file.read_text(encoding="utf-8"))
self._validate_catalog_payload(cached_data, entry.url)
return cached_data
except (json.JSONDecodeError, OSError, UnicodeError, ExtensionError):
# Cache is best-effort: a JSON-decode failure, an OS-level
# read failure (permissions / disk / handle limit), or a
# text-encoding failure on a cache file written by an older
# client all fall through to the network fetch path. Only
# the network failure is surfaced to the caller.
return json.loads(cache_file.read_text())
except json.JSONDecodeError:
pass
# Fetch from network
@@ -2110,32 +2042,16 @@ class ExtensionCatalog(CatalogStackBase):
with self._open_url(entry.url, timeout=10) as response:
catalog_data = json.loads(response.read())
self._validate_catalog_payload(catalog_data, entry.url)
if "schema_version" not in catalog_data or "extensions" not in catalog_data:
raise ExtensionError(f"Invalid catalog format from {entry.url}")
# Save to cache. Both files are explicitly UTF-8 to match the
# ``read_text(encoding="utf-8")`` on the read side and the
# ``integrations/catalog.py`` precedent (see the cache write
# helpers in ``CatalogCache`` there). Without this, platforms
# whose default encoding isn't UTF-8 would write locale-encoded
# bytes that the read path can't decode, forcing an unnecessary
# network refetch on every invocation. The write itself is
# best-effort, matching the read side: an unwritable cache dir
# (read-only checkout, permissions) must not fail a fetch whose
# payload was already fetched and validated.
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
cache_file.write_text(
json.dumps(catalog_data, indent=2), encoding="utf-8"
)
cache_meta_file.write_text(
json.dumps({
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": entry.url,
}, indent=2),
encoding="utf-8",
)
except OSError:
pass # Cache is best-effort; proceed with fetched data
# Save to cache
self.cache_dir.mkdir(parents=True, exist_ok=True)
cache_file.write_text(json.dumps(catalog_data, indent=2))
cache_meta_file.write_text(json.dumps({
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": entry.url,
}, indent=2))
return catalog_data
@@ -2182,16 +2098,6 @@ class ExtensionCatalog(CatalogStackBase):
continue
for ext_id, ext_data in catalog_data.get("extensions", {}).items():
# Per-entry guard: ``_fetch_single_catalog`` already validates
# that ``catalog_data["extensions"]`` is a mapping, but it
# does not (and should not) validate every entry shape there
# — one malformed entry shouldn't poison an otherwise valid
# catalog. Skip non-mapping entries here so a payload like
# ``{"extensions": {"foo": [], "bar": {...}}}`` still merges
# the valid entries without crashing on ``**ext_data``.
# Mirrors ``integrations/catalog.py:245``.
if not isinstance(ext_data, dict):
continue
if ext_id not in merged: # Higher-priority catalog wins
merged[ext_id] = {
**ext_data,
@@ -2208,12 +2114,6 @@ class ExtensionCatalog(CatalogStackBase):
def is_cache_valid(self) -> bool:
"""Check if cached catalog is still valid.
Returns ``False`` for any read/decoding failure on the metadata
file (missing fields, malformed JSON, permissions / disk errors,
wrong text encoding) so callers fall through to a network refetch
instead of crashing. Treating cache validity as best-effort
matches the contract used by the per-URL cache check below.
Returns:
True if cache exists and is within cache duration
"""
@@ -2221,28 +2121,13 @@ class ExtensionCatalog(CatalogStackBase):
return False
try:
metadata = json.loads(
self.cache_metadata_file.read_text(encoding="utf-8")
)
metadata = json.loads(self.cache_metadata_file.read_text())
cached_at = datetime.fromisoformat(metadata.get("cached_at", ""))
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
age_seconds = (datetime.now(timezone.utc) - cached_at).total_seconds()
return age_seconds < self.CACHE_DURATION
except (
json.JSONDecodeError,
OSError,
UnicodeError,
ValueError,
KeyError,
TypeError,
AttributeError,
):
# ``AttributeError`` covers the case where the metadata file is
# valid JSON but parses to a non-mapping (``[]``, ``"oops"``,
# ``42``) so ``metadata.get(...)`` would otherwise crash. All
# decode/shape failures degrade to "cache invalid" so the
# caller falls through to a network refetch.
except (json.JSONDecodeError, ValueError, KeyError, TypeError):
return False
def fetch_catalog(self, force_refresh: bool = False) -> Dict[str, Any]:
@@ -2257,62 +2142,36 @@ class ExtensionCatalog(CatalogStackBase):
Raises:
ExtensionError: If catalog cannot be fetched
"""
catalog_url = self.get_catalog_url()
# Check the cache first unless ``force_refresh`` was requested,
# then fall through to a network fetch. Match the
# ``_fetch_single_catalog`` cache contract: a poisoned or
# unreadable cache silently falls through to a network refetch
# rather than crashing the caller. ``_validate_catalog_payload``
# is reused here so a cache written by an older client
# (pre-validation) is rejected and refreshed instead of returning
# the stale malformed payload. ``is_cache_valid`` itself swallows
# OSError/UnicodeError on the metadata read, so a cache-validity
# check can't crash this method before the read-side fallback
# runs.
# Check cache first unless force refresh
if not force_refresh and self.is_cache_valid():
try:
cached_data = json.loads(self.cache_file.read_text(encoding="utf-8"))
self._validate_catalog_payload(cached_data, catalog_url)
return cached_data
except (json.JSONDecodeError, OSError, UnicodeError, ExtensionError):
return json.loads(self.cache_file.read_text())
except json.JSONDecodeError:
pass # Fall through to network fetch
# Fetch from network
catalog_url = self.get_catalog_url()
try:
import urllib.error
with self._open_url(catalog_url, timeout=10) as response:
catalog_data = json.loads(response.read())
# Validate catalog structure. Reuses the same helper as
# ``_fetch_single_catalog`` so all three branches (root type,
# missing keys, nested-mapping type) stay consistent.
self._validate_catalog_payload(catalog_data, catalog_url)
# Validate catalog structure
if "schema_version" not in catalog_data or "extensions" not in catalog_data:
raise ExtensionError("Invalid catalog format")
# Save to cache. Explicit UTF-8 on both writes mirrors the
# ``read_text(encoding="utf-8")`` on the read side and the
# ``integrations/catalog.py`` precedent — otherwise platforms
# whose default encoding isn't UTF-8 would write locale-encoded
# bytes the read path can't decode, forcing an unnecessary
# refetch on every invocation. Like the read side, the write
# is best-effort: an unwritable cache dir must not abort a
# fetch whose payload was already fetched and validated.
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_file.write_text(
json.dumps(catalog_data, indent=2), encoding="utf-8"
)
# Save to cache
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_file.write_text(json.dumps(catalog_data, indent=2))
# Save cache metadata
metadata = {
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": catalog_url,
}
self.cache_metadata_file.write_text(
json.dumps(metadata, indent=2), encoding="utf-8"
)
except OSError:
pass # Cache is best-effort; proceed with fetched data
# Save cache metadata
metadata = {
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": catalog_url,
}
self.cache_metadata_file.write_text(json.dumps(metadata, indent=2))
return catalog_data

View File

@@ -25,14 +25,17 @@ class IntegrationReadError:
schema: int | None = None
def _read_integration_json_data(
def try_read_integration_json(
project_root: Path,
) -> tuple[dict[str, Any] | None, IntegrationReadError | None]:
"""Read raw integration state without normalizing or raising.
"""Parse ``.specify/integration.json`` without raising.
Returns ``(data, None)`` when the JSON object is readable and supported,
``(None, None)`` when the file is absent, and ``(None, error)`` for parse,
schema, encoding, or filesystem failures.
Returns ``(normalized_state, None)`` on success, ``(None, None)`` when the
file does not exist, or ``(None, error)`` for any parse / validation
failure. This is the single low-level reader; both the CLI's loud
``_read_integration_json`` and the workflow engine's silent
``_load_project_integration`` consume it so the schema guard and parse
logic cannot drift between them.
"""
path = project_root / INTEGRATION_JSON
# Avoid Path.exists() / Path.is_file() as a pre-check: both return False
@@ -67,41 +70,9 @@ def _read_integration_json_data(
and schema > INTEGRATION_STATE_SCHEMA
):
return None, IntegrationReadError(kind="schema_too_new", schema=schema)
return data, None
def try_read_integration_json(
project_root: Path,
) -> tuple[dict[str, Any] | None, IntegrationReadError | None]:
"""Parse ``.specify/integration.json`` without raising.
Returns ``(normalized_state, None)`` on success, ``(None, None)`` when the
file does not exist, or ``(None, error)`` for any parse / validation
failure. This helper delegates file I/O and raw JSON validation to
``_read_integration_json_data`` so callers that need raw state can share
the same low-level reader instead of duplicating parse logic.
"""
data, error = _read_integration_json_data(project_root)
if data is None:
return None, error
return normalize_integration_state(data), None
def try_read_integration_json_with_raw(
project_root: Path,
) -> tuple[dict[str, Any] | None, dict[str, Any] | None, IntegrationReadError | None]:
"""Parse ``integration.json`` and return normalized plus raw state.
Returns ``(normalized_state, raw_state, None)`` when the file is readable,
``(None, None, None)`` when it is absent, and ``(None, None, error)`` for
parse, schema, encoding, or filesystem failures.
"""
data, error = _read_integration_json_data(project_root)
if data is None:
return None, None, error
return normalize_integration_state(data), data, None
def clean_integration_key(key: Any) -> str | None:
"""Return a stripped integration key, or None for empty/non-string values."""
if not isinstance(key, str) or not key.strip():

View File

@@ -1,663 +0,0 @@
"""Read-only status reporting for project integration state."""
from __future__ import annotations
import hashlib
import re
import stat
from pathlib import Path
from typing import Any
from .integration_state import (
INTEGRATION_JSON,
INTEGRATION_STATE_SCHEMA,
IntegrationReadError,
default_integration_key,
installed_integration_keys,
try_read_integration_json_with_raw,
)
from .integrations import INTEGRATION_REGISTRY
from .integrations.manifest import IntegrationManifest
_MANIFEST_READ_ERRORS = (ValueError, OSError)
_MANIFEST_KEY_RE = re.compile(r"^[A-Za-z0-9._-]+$")
_WINDOWS_RESERVED_MANIFEST_BASENAMES = {
"CON",
"PRN",
"AUX",
"NUL",
*(f"COM{i}" for i in range(1, 10)),
*(f"LPT{i}" for i in range(1, 10)),
}
_SHARED_MANIFEST_KEY = "speckit"
def _finding(
severity: str,
code: str,
message: str,
*,
integration: str | None = None,
path: str | None = None,
suggestion: str | None = None,
) -> dict[str, str]:
item = {
"severity": severity,
"code": code,
"message": message,
}
if integration:
item["integration"] = integration
if path:
item["path"] = path
if suggestion:
item["suggestion"] = suggestion
return item
def _status(findings: list[dict[str, str]]) -> str:
if any(item["severity"] == "error" for item in findings):
return "error"
if findings:
return "warning"
return "ok"
def _with_error_detail(message: str, error: IntegrationReadError) -> str:
if error.detail:
return f"{message} Detail: {error.detail}"
return message
def _integration_state_error_message(error: IntegrationReadError) -> str:
if error.kind == "decode":
return _with_error_detail(
f"{INTEGRATION_JSON} contains invalid JSON or is not valid UTF-8.",
error,
)
if error.kind == "os":
return _with_error_detail(f"Could not read {INTEGRATION_JSON}.", error)
if error.kind == "not_object":
return f"{INTEGRATION_JSON} must contain a JSON object, got {error.detail}."
if error.kind == "schema_too_new":
return (
f"{INTEGRATION_JSON} uses integration state schema {error.schema}, "
f"which is newer than this CLI supports; supported schema: {INTEGRATION_STATE_SCHEMA}."
)
return f"Could not inspect {INTEGRATION_JSON}."
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as fh:
for chunk in iter(lambda: fh.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def _strip_extended_length_prefix(path: Path) -> Path:
"""Drop the Windows ``\\\\?\\`` extended-length prefix for path comparison.
``os.readlink`` and ``Path.resolve`` can return extended-length paths on
Windows (e.g. ``\\\\?\\C:\\proj``). Comparing such a path against a plain
``C:\\proj`` root via :meth:`Path.relative_to` would spuriously fail, so we
normalise both sides through this helper before containment checks.
"""
raw = str(path)
if raw.startswith("\\\\?\\UNC\\"):
return Path("\\\\" + raw[len("\\\\?\\UNC\\"):])
if raw.startswith("\\\\?\\"):
return Path(raw[len("\\\\?\\"):])
return path
def _is_within_project(project_root_resolved: Path, candidate: Path) -> bool:
"""Return ``True`` when *candidate* stays within *project_root_resolved*.
Both paths are stripped of any Windows extended-length prefix first so that
a target produced by ``os.readlink`` (which may be ``\\\\?\\``-prefixed) is
still recognised as living inside an unprefixed project root.
"""
try:
_strip_extended_length_prefix(candidate).relative_to(
_strip_extended_length_prefix(project_root_resolved)
)
except ValueError:
return False
return True
def _safe_manifest_file(
project_root: Path,
project_root_resolved: Path,
rel: str,
*,
project_root_is_resolved: bool = True,
) -> Path | None:
rel_path = Path(rel)
if rel_path.is_absolute() or ".." in rel_path.parts:
return None
candidate = project_root / rel_path
if not project_root_is_resolved:
walk = project_root
for part in rel_path.parts[:-1]:
walk = walk / part
try:
if walk.is_symlink():
return None
except OSError:
return None
try:
candidate_parent = (
candidate.parent.resolve(strict=False)
if project_root_is_resolved
else candidate.parent.absolute()
)
except (OSError, RuntimeError):
return None
if not _is_within_project(project_root_resolved, candidate_parent):
return None
return candidate
def _tracked_symlink_manifest_status(
path: Path,
project_root_resolved: Path,
*,
project_root_is_resolved: bool = True,
) -> str:
"""Classify a tracked symlink without following it outside the project.
Manifests store content hashes for regular files, so an existing in-project
symlink is still reported as modified. Escaping targets are invalid, and
dangling in-project targets are missing.
"""
try:
target = path.readlink()
except OSError:
return "modified"
target_path = target if target.is_absolute() else path.parent / target
try:
contained_parent = (
target_path.parent.resolve(strict=False)
if project_root_is_resolved
else target_path.parent.absolute()
)
except (OSError, RuntimeError):
return "invalid"
if not _is_within_project(project_root_resolved, contained_parent):
return "invalid"
try:
target_path.lstat()
except FileNotFoundError:
return "missing"
except OSError:
return "modified"
return "modified"
def _resolve_project_root_for_status(
project_root: Path,
findings: list[dict[str, str]],
) -> tuple[Path, bool]:
try:
return project_root.resolve(), True
except (OSError, RuntimeError) as exc:
findings.append(
_finding(
"warning",
"project-root-unresolved",
f"Could not fully resolve project root: {exc}",
suggestion="Check project path permissions and symlinks before relying on manifest path checks.",
)
)
return project_root.absolute(), False
def _is_safe_manifest_key(key: str) -> bool:
if key in {"", ".", ".."}:
return False
if key.endswith("."):
return False
if _MANIFEST_KEY_RE.fullmatch(key) is None:
return False
if key.split(".", 1)[0].upper() in _WINDOWS_RESERVED_MANIFEST_BASENAMES:
return False
if "/" in key or "\\" in key:
return False
key_path = Path(key)
return not key_path.is_absolute() and key_path.name == key
def _manifest_file_status(
manifest: IntegrationManifest,
project_root_resolved: Path,
*,
project_root_is_resolved: bool = True,
) -> tuple[list[str], list[str], list[str], list[str]]:
missing: list[str] = []
modified: list[str] = []
invalid: list[str] = []
valid: list[str] = []
for rel, expected_hash in manifest.files.items():
path = _safe_manifest_file(
manifest.project_root,
project_root_resolved,
rel,
project_root_is_resolved=project_root_is_resolved,
)
if path is None:
invalid.append(rel)
continue
try:
path_stat = path.lstat()
except FileNotFoundError:
valid.append(rel)
missing.append(rel)
continue
except OSError:
valid.append(rel)
modified.append(rel)
continue
is_symlink = stat.S_ISLNK(path_stat.st_mode)
if not is_symlink:
try:
is_symlink = path.is_symlink()
except OSError:
is_symlink = False
if is_symlink:
symlink_status = _tracked_symlink_manifest_status(
path,
project_root_resolved,
project_root_is_resolved=project_root_is_resolved,
)
if symlink_status == "invalid":
invalid.append(rel)
continue
valid.append(rel)
if symlink_status == "missing":
missing.append(rel)
continue
modified.append(rel)
continue
valid.append(rel)
if not stat.S_ISREG(path_stat.st_mode):
modified.append(rel)
continue
try:
if _sha256_file(path) != expected_hash:
modified.append(rel)
except OSError:
modified.append(rel)
return missing, modified, invalid, valid
def _default_not_installed_from_raw_state(raw_state: dict[str, Any]) -> str | None:
if not isinstance(raw_state.get("installed_integrations"), list):
return None
raw_default = default_integration_key(raw_state)
raw_installed = installed_integration_keys(raw_state)
if raw_default and raw_default not in raw_installed:
return raw_default
return None
def _manifest_summary(
manifest_path: Path,
project_root: Path,
*,
readable: bool,
tracked_files: int = 0,
missing_files: list[str] | None = None,
modified_files: list[str] | None = None,
invalid_files: list[str] | None = None,
) -> dict[str, Any]:
return {
"manifest": manifest_path.relative_to(project_root).as_posix(),
"readable": readable,
"tracked_files": tracked_files,
"missing_files": missing_files or [],
"modified_files": modified_files or [],
"invalid_files": invalid_files or [],
}
def _manifest_owner(key: str) -> str:
if key == _SHARED_MANIFEST_KEY:
return "shared Spec Kit infrastructure"
return f"integration '{key}'"
def _manifest_suggestion(key: str, default_key: str | None) -> str:
if key == _SHARED_MANIFEST_KEY:
if default_key and default_key in INTEGRATION_REGISTRY:
return f"Run `specify integration upgrade {default_key}` to regenerate shared managed files."
return (
"Run `specify init --here --force --integration <key>` to regenerate "
"shared managed files."
)
if key not in INTEGRATION_REGISTRY:
return (
"Upgrade Spec Kit, reinstall with a supported CLI version, "
f"or remove the stale integration entry from {INTEGRATION_JSON}."
)
return f"Run `specify integration upgrade {key}` or reinstall the integration."
def build_integration_status_report(project_root: Path) -> dict[str, Any]:
"""Return a machine-readable integration status report for *project_root*."""
findings: list[dict[str, str]] = []
project_root_resolved, project_root_is_resolved = _resolve_project_root_for_status(
project_root,
findings,
)
state, raw_state, error = try_read_integration_json_with_raw(project_root)
if error is not None:
findings.append(
_finding(
"error",
"integration-state-unreadable",
_integration_state_error_message(error),
path=INTEGRATION_JSON,
suggestion=f"Fix or delete {INTEGRATION_JSON}, then retry.",
)
)
return _build_report(None, [], findings, {}, None)
if state is None:
findings.append(
_finding(
"error",
"integration-state-missing",
f"{INTEGRATION_JSON} is missing.",
path=INTEGRATION_JSON,
suggestion="Run `specify integration install <key>` to install an integration.",
)
)
return _build_report(None, [], findings, {}, None)
assert raw_state is not None
raw_default_key = default_integration_key(raw_state)
raw_installed_value = raw_state.get("installed_integrations")
raw_installed_is_list = isinstance(raw_installed_value, list)
raw_installed_keys = (
installed_integration_keys(raw_state)
if raw_installed_is_list
else []
)
default_key = raw_default_key or default_integration_key(state)
installed_keys = installed_integration_keys(state)
raw_default_not_installed = _default_not_installed_from_raw_state(raw_state)
if raw_installed_is_list and raw_default_not_installed and raw_installed_keys:
check_installed_keys = raw_installed_keys
else:
check_installed_keys = installed_keys
recorded_installed_keys = raw_installed_keys
if "installed_integrations" in raw_state and not raw_installed_is_list:
findings.append(
_finding(
"warning",
"installed-integrations-invalid",
(
"installed_integrations must be a list, "
f"got {type(raw_installed_value).__name__}."
),
path=INTEGRATION_JSON,
suggestion=f"Fix {INTEGRATION_JSON}, then retry.",
)
)
if not installed_keys:
findings.append(
_finding(
"warning",
"no-installed-integrations",
"No installed integrations are recorded.",
suggestion="Run `specify integration install <key>` to install one.",
)
)
if raw_installed_keys and raw_default_key is None:
default_key = None
findings.append(
_finding(
"error",
"default-integration-missing",
"No default integration is recorded.",
suggestion="Run `specify integration use <key>` after choosing an installed integration.",
)
)
if raw_default_not_installed:
findings.append(
_finding(
"error",
"default-integration-not-installed",
(
f"Default integration '{raw_default_not_installed}' is not listed "
"in installed_integrations."
),
integration=raw_default_not_installed,
suggestion="Run `specify integration use <key>` for an installed integration, or reinstall the default integration.",
)
)
known_installed = [key for key in check_installed_keys if key in INTEGRATION_REGISTRY]
unknown_installed: list[str] = []
for key in check_installed_keys:
if key not in INTEGRATION_REGISTRY:
unknown_installed.append(key)
findings.append(
_finding(
"error",
"unknown-integration",
f"Integration '{key}' is installed but is not known to this CLI.",
integration=key,
suggestion=(
"Upgrade Spec Kit, reinstall with a supported CLI version, "
f"or remove the stale integration entry from {INTEGRATION_JSON}."
),
)
)
unsafe = [
key for key in known_installed
if not getattr(INTEGRATION_REGISTRY[key], "multi_install_safe", False)
]
if len(check_installed_keys) > 1:
unsafe.extend(unknown_installed)
if len(check_installed_keys) > 1 and unsafe:
findings.append(
_finding(
"error",
"unsafe-multi-install",
(
"Installed integrations are not all declared multi-install safe: "
+ ", ".join(sorted(unsafe))
),
suggestion=(
"Use `specify integration use <key>` to change defaults, "
"or `specify integration switch <key>` only when replacing integrations."
),
)
)
manifest_files_by_path: dict[str, list[str]] = {}
manifest_summaries: dict[str, dict[str, Any]] = {}
attempted_manifest_keys: list[str] = []
manifest_keys = list(check_installed_keys)
if _SHARED_MANIFEST_KEY not in manifest_keys:
manifest_keys.append(_SHARED_MANIFEST_KEY)
for key in manifest_keys:
owner = _manifest_owner(key)
if not _is_safe_manifest_key(key):
findings.append(
_finding(
"error",
"integration-key-invalid",
f"Integration key {key!r} cannot be used as a manifest filename.",
integration=key,
path=INTEGRATION_JSON,
suggestion=f"Fix {INTEGRATION_JSON}, then reinstall the integration.",
)
)
continue
attempted_manifest_keys.append(key)
manifest_path = project_root / ".specify" / "integrations" / f"{key}.manifest.json"
try:
manifest = IntegrationManifest.load(
key,
project_root_resolved,
resolve_project_root=False,
)
except FileNotFoundError:
findings.append(
_finding(
"error",
"manifest-missing",
f"Manifest for {owner} is missing.",
integration=key,
path=manifest_path.relative_to(project_root).as_posix(),
suggestion=_manifest_suggestion(key, default_key),
)
)
manifest_summaries[key] = _manifest_summary(
manifest_path,
project_root,
readable=False,
)
continue
except _MANIFEST_READ_ERRORS as exc:
manifest_summaries[key] = _manifest_summary(
manifest_path,
project_root,
readable=False,
)
findings.append(
_finding(
"error",
"manifest-unreadable",
f"Manifest for {owner} is unreadable: {exc}",
integration=key,
path=manifest_path.relative_to(project_root).as_posix(),
suggestion=_manifest_suggestion(key, default_key),
)
)
continue
missing, modified, invalid, valid_files = _manifest_file_status(
manifest,
project_root_resolved,
project_root_is_resolved=project_root_is_resolved,
)
manifest_summaries[key] = _manifest_summary(
manifest_path,
project_root,
readable=True,
tracked_files=len(manifest.files),
missing_files=missing,
modified_files=modified,
invalid_files=invalid,
)
for rel in valid_files:
manifest_files_by_path.setdefault(rel, []).append(key)
if invalid:
findings.append(
_finding(
"error",
"manifest-paths-invalid",
f"{len(invalid)} unsafe manifest path(s) are recorded for {owner}.",
integration=key,
path=manifest_path.relative_to(project_root).as_posix(),
suggestion=_manifest_suggestion(key, default_key),
)
)
if missing:
findings.append(
_finding(
"error",
"managed-files-missing",
f"{len(missing)} managed file(s) are missing for {owner}.",
integration=key,
suggestion=_manifest_suggestion(key, default_key),
)
)
if modified:
findings.append(
_finding(
"warning",
"managed-files-modified",
f"{len(modified)} managed file(s) were modified for {owner}.",
integration=key,
suggestion="Review the changes before running `specify integration upgrade --force`.",
)
)
for rel, keys in sorted(manifest_files_by_path.items()):
if len(keys) > 1:
findings.append(
_finding(
"warning",
"managed-file-collision",
f"Managed file '{rel}' is tracked by multiple integrations: {', '.join(sorted(keys))}.",
path=rel,
suggestion="Review the manifests before uninstalling or upgrading these integrations.",
)
)
if not raw_installed_is_list or not raw_installed_keys:
multi_install_safe = None
else:
multi_install_safe = not (len(check_installed_keys) > 1 and unsafe)
return _build_report(
default_key,
installed_keys,
findings,
manifest_summaries,
multi_install_safe,
manifest_checked_keys=attempted_manifest_keys,
recorded_installed_keys=recorded_installed_keys,
)
def _build_report(
default_key: str | None,
installed_keys: list[str],
findings: list[dict[str, str]],
manifests: dict[str, dict[str, Any]],
multi_install_safe: bool | None,
*,
manifest_checked_keys: list[str] | None = None,
recorded_installed_keys: list[str] | None = None,
) -> dict[str, Any]:
missing_count = sum(len(item.get("missing_files", [])) for item in manifests.values())
modified_count = sum(len(item.get("modified_files", [])) for item in manifests.values())
invalid_count = sum(len(item.get("invalid_files", [])) for item in manifests.values())
unchecked_count = sum(1 for item in manifests.values() if not item.get("readable", True))
return {
"status": _status(findings),
"default_integration": default_key,
"installed_integrations": installed_keys,
"recorded_installed_integrations": (
installed_keys if recorded_installed_keys is None else recorded_installed_keys
),
"manifest_checked_integrations": (
installed_keys if manifest_checked_keys is None else manifest_checked_keys
),
"multi_install_safe": multi_install_safe,
"shared_templates_target_alignment": default_key,
"missing_managed_files": missing_count,
"modified_managed_files": modified_count,
"invalid_manifest_paths": invalid_count,
"unchecked_manifests": unchecked_count,
"manifests": manifests,
"findings": findings,
}

View File

@@ -1,12 +1,10 @@
"""specify integration list/status/use/search/info + catalog list/add/remove command handlers."""
"""specify integration list/use/search/info + catalog list/add/remove command handlers."""
from __future__ import annotations
import json
import os
from typing import Any, Optional
from typing import Optional
import typer
from rich.markup import escape as _rich_escape
from rich.table import Table
from .._console import console
@@ -122,86 +120,6 @@ def integration_list(
console.print("Install one with: [cyan]specify integration install <key>[/cyan]")
def _print_integration_status_report(report: dict[str, Any]) -> None:
status = report["status"]
status_label = {
"ok": "[green]OK[/green]",
"warning": "[yellow]WARNING[/yellow]",
"error": "[red]ERROR[/red]",
}.get(str(status), str(status).upper())
installed = report.get("installed_integrations") or []
installed_display = ", ".join(_rich_escape(str(item)) for item in installed)
console.print(f"Integration status: {status_label}")
console.print(
f"Default integration: {_rich_escape(str(report.get('default_integration') or 'none'))}"
)
console.print(f"Installed integrations: {installed_display if installed else 'none'}")
multi_install_safe = report.get("multi_install_safe")
if multi_install_safe is None:
multi_install_safe_display = "unknown"
else:
multi_install_safe_display = "yes" if multi_install_safe else "no"
console.print(f"Multi-install safe: {multi_install_safe_display}")
console.print(
f"Shared templates target alignment: "
f"{_rich_escape(str(report.get('shared_templates_target_alignment') or 'none'))}"
)
console.print(f"Modified managed files: {report.get('modified_managed_files', 0)}")
console.print(f"Missing managed files: {report.get('missing_managed_files', 0)}")
console.print(f"Invalid manifest paths: {report.get('invalid_manifest_paths', 0)}")
console.print(f"Unchecked manifests: {report.get('unchecked_manifests', 0)}")
findings = report.get("findings") or []
if not findings:
return
console.print()
console.print("[bold]Findings:[/bold]")
for item in findings:
severity = item.get("severity", "")
severity_label = {
"error": "[red]error[/red]",
"warning": "[yellow]warning[/yellow]",
}.get(severity, severity)
prefix = f"- {severity_label} {_rich_escape(str(item.get('code', '')))}"
if item.get("integration"):
prefix += f" ({_rich_escape(str(item['integration']))})"
console.print(
f"{prefix}: {_rich_escape(str(item.get('message', '')))}",
soft_wrap=True,
)
if item.get("suggestion"):
console.print(
f" Suggestion: {_rich_escape(str(item['suggestion']))}",
soft_wrap=True,
)
@integration_app.command("status")
def integration_status(
json_output: bool = typer.Option(
False,
"--json",
help="Emit machine-readable integration status.",
),
):
"""Report the current project's integration status without changing files."""
from .. import _require_specify_project
from ..integration_status import build_integration_status_report
project_root = _require_specify_project()
report = build_integration_status_report(project_root)
if json_output:
typer.echo(json.dumps(report, indent=2))
else:
_print_integration_status_report(report)
if report["status"] == "error":
raise typer.Exit(1)
@integration_app.command("use")
def integration_use(
key: str = typer.Argument(help="Installed integration key to make the default"),

View File

@@ -108,23 +108,11 @@ class IntegrationManifest:
key: Integration identifier (e.g. ``"copilot"``).
project_root: Absolute path to the project directory.
version: CLI version string recorded in the manifest.
resolve_project_root: Resolve ``project_root`` before using it.
"""
def __init__(
self,
key: str,
project_root: Path,
version: str = "",
*,
resolve_project_root: bool = True,
) -> None:
def __init__(self, key: str, project_root: Path, version: str = "") -> None:
self.key = key
self.project_root = (
project_root.resolve()
if resolve_project_root
else project_root.absolute()
)
self.project_root = project_root.resolve()
self.version = version
self._files: dict[str, str] = {} # rel_path → sha256 hex
self._recovered_files: set[str] = set()
@@ -399,18 +387,12 @@ class IntegrationManifest:
return path
@classmethod
def load(
cls,
key: str,
project_root: Path,
*,
resolve_project_root: bool = True,
) -> IntegrationManifest:
def load(cls, key: str, project_root: Path) -> IntegrationManifest:
"""Load an existing manifest from disk.
Raises ``FileNotFoundError`` if the manifest does not exist.
"""
inst = cls(key, project_root, resolve_project_root=resolve_project_root)
inst = cls(key, project_root)
path = inst.manifest_path
try:
data = json.loads(path.read_text(encoding="utf-8"))

View File

@@ -1892,48 +1892,6 @@ class PresetCatalog:
download_url, self._open_url, timeout=timeout
)
def _validate_catalog_payload(self, catalog_data: Any, url: str) -> None:
"""Validate a parsed preset-catalog payload's shape.
Applied to both network-fetched and cache-loaded payloads so a
once-poisoned cache (older spec-kit version, manual edit, upstream
served a bad payload before the network-side guards were added)
cannot re-crash ``_get_merged_packs`` on subsequent calls.
Checking only key presence would let a payload like
``{"presets": []}`` or ``{"presets": null}`` slip through here and
then crash with ``AttributeError: 'list' object has no attribute
'items'`` deep inside ``_get_merged_packs``. The sibling
integration catalog reader already guards both the root object and
the nested mapping (see ``integrations/catalog.py``); the preset
catalog must stay consistent so a malformed payload surfaces as
the user-facing ``Invalid preset catalog format`` error instead of
a raw Python traceback.
Args:
catalog_data: Parsed JSON payload from the catalog source.
url: Source URL — used in the error message so the user can
tell which catalog in a multi-catalog stack is malformed.
Raises:
PresetError: If the payload's shape is invalid.
"""
if not isinstance(catalog_data, dict):
raise PresetError(
f"Invalid preset catalog format from {url}: "
"expected a JSON object"
)
if (
"schema_version" not in catalog_data
or "presets" not in catalog_data
):
raise PresetError(f"Invalid preset catalog format from {url}")
if not isinstance(catalog_data.get("presets"), dict):
raise PresetError(
f"Invalid preset catalog format from {url}: "
"'presets' must be a JSON object"
)
def _load_catalog_config(self, config_path: Path) -> Optional[List[PresetCatalogEntry]]:
"""Load catalog stack configuration from a YAML file.
@@ -2095,7 +2053,7 @@ class PresetCatalog:
if not cache_file.exists() or not metadata_file.exists():
return False
try:
metadata = json.loads(metadata_file.read_text(encoding="utf-8"))
metadata = json.loads(metadata_file.read_text())
cached_at = datetime.fromisoformat(metadata.get("cached_at", ""))
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
@@ -2103,23 +2061,7 @@ class PresetCatalog:
datetime.now(timezone.utc) - cached_at
).total_seconds()
return age_seconds < self.CACHE_DURATION
except (
json.JSONDecodeError,
OSError,
UnicodeError,
ValueError,
KeyError,
TypeError,
AttributeError,
):
# Cache validity is best-effort: invalid/missing fields, an
# unreadable metadata file (permissions / disk), a wrongly
# encoded one (written by a tool using the system locale
# codec), or a metadata payload that parses to a non-mapping
# like ``[]`` or ``"oops"`` (so ``metadata.get(...)`` raises
# ``AttributeError``) all degrade to "cache invalid" so the
# caller falls through to a network refetch instead of
# crashing.
except (json.JSONDecodeError, ValueError, KeyError, TypeError):
return False
def _fetch_single_catalog(self, entry: PresetCatalogEntry, force_refresh: bool = False) -> Dict[str, Any]:
@@ -2137,55 +2079,29 @@ class PresetCatalog:
"""
cache_file, metadata_file = self._get_cache_paths(entry.url)
# Use cache if valid. A previously-cached payload must clear the
# same shape checks as a freshly-fetched one — otherwise a once-
# poisoned cache would re-crash on every invocation despite the
# cache being "valid" by age. If validation fails on the cached
# read, fall through to the network fetch path so the cache gets
# refreshed.
if not force_refresh and self._is_url_cache_valid(entry.url):
try:
cached_data = json.loads(cache_file.read_text(encoding="utf-8"))
self._validate_catalog_payload(cached_data, entry.url)
return cached_data
except (json.JSONDecodeError, OSError, UnicodeError, PresetError):
# Cache is best-effort: a JSON-decode failure, an OS-level
# read failure (permissions / disk / handle limit), or a
# text-encoding failure on a cache file written by an
# older client all fall through to the network fetch path.
# Only the network failure is surfaced to the caller.
return json.loads(cache_file.read_text())
except json.JSONDecodeError:
pass
try:
with self._open_url(entry.url, timeout=10) as response:
catalog_data = json.loads(response.read())
self._validate_catalog_payload(catalog_data, entry.url)
if (
"schema_version" not in catalog_data
or "presets" not in catalog_data
):
raise PresetError("Invalid preset catalog format")
# Both files are written explicitly as UTF-8 to match the
# ``read_text(encoding="utf-8")`` on the read side and the
# ``integrations/catalog.py`` precedent. Without this,
# platforms whose default encoding isn't UTF-8 would write
# locale-encoded bytes the read path can't decode, forcing an
# unnecessary refetch on every invocation. The write itself
# is best-effort like the read side: an unwritable cache dir
# (read-only checkout, permissions) must not be re-raised as
# a ``PresetError`` for a payload that was already fetched
# and validated.
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
cache_file.write_text(
json.dumps(catalog_data, indent=2), encoding="utf-8"
)
metadata = {
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": entry.url,
}
metadata_file.write_text(
json.dumps(metadata, indent=2), encoding="utf-8"
)
except OSError:
pass # Cache is best-effort; proceed with fetched data
self.cache_dir.mkdir(parents=True, exist_ok=True)
cache_file.write_text(json.dumps(catalog_data, indent=2))
metadata = {
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": entry.url,
}
metadata_file.write_text(json.dumps(metadata, indent=2))
return catalog_data
@@ -2211,17 +2127,6 @@ class PresetCatalog:
try:
data = self._fetch_single_catalog(entry, force_refresh)
for pack_id, pack_data in data.get("presets", {}).items():
# Per-entry guard: ``_fetch_single_catalog`` already
# validates that ``data["presets"]`` is a mapping, but it
# does not (and should not) validate every entry shape
# there — one malformed entry shouldn't poison an
# otherwise valid catalog. Skip non-mapping entries here
# so a payload like ``{"presets": {"foo": [], "bar":
# {...}}}`` still merges the valid entries without
# crashing on ``**pack_data``. Mirrors
# ``integrations/catalog.py:245``.
if not isinstance(pack_data, dict):
continue
pack_data_with_catalog = {**pack_data, "_catalog_name": entry.name, "_install_allowed": entry.install_allowed}
merged[pack_id] = pack_data_with_catalog
except PresetError:
@@ -2232,12 +2137,6 @@ class PresetCatalog:
def is_cache_valid(self) -> bool:
"""Check if cached catalog is still valid.
Returns ``False`` for any read/decoding failure on the metadata
file (missing fields, malformed JSON, permissions / disk errors,
wrong text encoding) so callers fall through to a network refetch
instead of crashing. Treating cache validity as best-effort
matches the contract used by ``_is_url_cache_valid`` above.
Returns:
True if cache exists and is within cache duration
"""
@@ -2245,9 +2144,7 @@ class PresetCatalog:
return False
try:
metadata = json.loads(
self.cache_metadata_file.read_text(encoding="utf-8")
)
metadata = json.loads(self.cache_metadata_file.read_text())
cached_at = datetime.fromisoformat(metadata.get("cached_at", ""))
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
@@ -2255,20 +2152,7 @@ class PresetCatalog:
datetime.now(timezone.utc) - cached_at
).total_seconds()
return age_seconds < self.CACHE_DURATION
except (
json.JSONDecodeError,
OSError,
UnicodeError,
ValueError,
KeyError,
TypeError,
AttributeError,
):
# ``AttributeError`` covers the case where the metadata file
# parses to a non-mapping (``[]``, ``"oops"``, ``42``) so
# ``metadata.get(...)`` would otherwise crash. All decode /
# shape failures degrade to "cache invalid" so the caller
# falls through to a network refetch.
except (json.JSONDecodeError, ValueError, KeyError, TypeError):
return False
def fetch_catalog(self, force_refresh: bool = False) -> Dict[str, Any]:
@@ -2285,61 +2169,35 @@ class PresetCatalog:
"""
catalog_url = self.get_catalog_url()
# Match the ``_fetch_single_catalog`` cache contract: a poisoned
# or unreadable cache silently falls through to a network refetch
# rather than crashing the caller. ``_validate_catalog_payload``
# is reused here so a cache written by an older client
# (pre-validation) is rejected and refreshed instead of returning
# the stale malformed payload.
if not force_refresh and self.is_cache_valid():
try:
metadata = json.loads(
self.cache_metadata_file.read_text(encoding="utf-8")
)
metadata = json.loads(self.cache_metadata_file.read_text())
if metadata.get("catalog_url") == catalog_url:
cached_data = json.loads(
self.cache_file.read_text(encoding="utf-8")
)
self._validate_catalog_payload(cached_data, catalog_url)
return cached_data
except (json.JSONDecodeError, OSError, UnicodeError, PresetError):
# Cache is corrupt, unreadable, or fails the shape check;
# fall through to network fetch.
return json.loads(self.cache_file.read_text())
except (json.JSONDecodeError, OSError):
# Cache is corrupt or unreadable; fall through to network fetch
pass
try:
with self._open_url(catalog_url, timeout=10) as response:
catalog_data = json.loads(response.read())
# Validate catalog structure. Reuses the same helper as
# ``_fetch_single_catalog`` so all three branches (root type,
# missing keys, nested-mapping type) stay consistent.
self._validate_catalog_payload(catalog_data, catalog_url)
if (
"schema_version" not in catalog_data
or "presets" not in catalog_data
):
raise PresetError("Invalid preset catalog format")
# Save to cache. Explicit UTF-8 on both writes mirrors the
# ``read_text(encoding="utf-8")`` on the read side and the
# ``integrations/catalog.py`` precedent — otherwise platforms
# whose default encoding isn't UTF-8 would write
# locale-encoded bytes the read path can't decode, forcing an
# unnecessary refetch on every invocation. Like the read
# side, the write is best-effort: an unwritable cache dir
# must not be re-raised as a ``PresetError`` for a payload
# that was already fetched and validated.
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_file.write_text(
json.dumps(catalog_data, indent=2), encoding="utf-8"
)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_file.write_text(json.dumps(catalog_data, indent=2))
metadata = {
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": catalog_url,
}
self.cache_metadata_file.write_text(
json.dumps(metadata, indent=2), encoding="utf-8"
)
except OSError:
pass # Cache is best-effort; proceed with fetched data
metadata = {
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": catalog_url,
}
self.cache_metadata_file.write_text(
json.dumps(metadata, indent=2)
)
return catalog_data

View File

@@ -2,10 +2,7 @@
import json
import os
import shutil
from pathlib import Path
import pytest
from typer.testing import CliRunner
from specify_cli import app
@@ -50,32 +47,6 @@ def _write_invalid_manifest(project, key):
return manifest
def _copy_project_template(tmp_path, template):
project = tmp_path / "proj"
shutil.copytree(template, project)
return project
@pytest.fixture(scope="module")
def status_copilot_template(tmp_path_factory):
return _init_project(tmp_path_factory.mktemp("status-copilot"), "copilot")
@pytest.fixture(scope="module")
def status_claude_template(tmp_path_factory):
return _init_project(tmp_path_factory.mktemp("status-claude"), "claude")
@pytest.fixture
def copilot_project(tmp_path, status_copilot_template):
return _copy_project_template(tmp_path, status_copilot_template)
@pytest.fixture
def claude_project(tmp_path, status_claude_template):
return _copy_project_template(tmp_path, status_claude_template)
def _integration_list_row_cells(output: str, key: str) -> list[str]:
plain = strip_ansi(output)
row = next(line for line in plain.splitlines() if line.startswith(f"{key}"))
@@ -155,823 +126,6 @@ class TestIntegrationList:
assert "only supports schema 1" in normalized
# ── status ───────────────────────────────────────────────────────────
class TestIntegrationStatus:
def test_status_requires_speckit_project(self, tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
result = runner.invoke(app, ["integration", "status"])
assert result.exit_code != 0
assert "Not a spec-kit project" in result.output
def test_status_reports_healthy_project(self, copilot_project):
result = _run_in_project(copilot_project, ["integration", "status"])
assert result.exit_code == 0
assert "Integration status: OK" in result.output
assert "Default integration: copilot" in result.output
assert "Installed integrations: copilot" in result.output
assert "Shared templates target alignment: copilot" in result.output
assert "Modified managed files: 0" in result.output
assert "Missing managed files: 0" in result.output
def test_status_json_reports_healthy_project(self, copilot_project):
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code == 0
payload = json.loads(result.output)
assert payload["status"] == "ok"
assert payload["default_integration"] == "copilot"
assert payload["installed_integrations"] == ["copilot"]
assert payload["recorded_installed_integrations"] == ["copilot"]
assert payload["manifest_checked_integrations"] == ["copilot", "speckit"]
assert payload["multi_install_safe"] is True
assert payload["shared_templates_target_alignment"] == "copilot"
assert "shared_templates_aligned_to" not in payload
assert payload["findings"] == []
def test_status_reports_invalid_integration_json(self, copilot_project):
(copilot_project / ".specify" / "integration.json").write_text("{", encoding="utf-8")
result = _run_in_project(copilot_project, ["integration", "status"])
assert result.exit_code != 0
assert "integration-state-unreadable" in result.output
assert "invalid JSON" in result.output
assert "Detail:" in result.output
assert "Multi-install safe: unknown" in result.output
assert "Traceback" not in result.output
def test_status_json_reports_unknown_multi_install_safety_when_state_unreadable(
self,
copilot_project,
):
(copilot_project / ".specify" / "integration.json").write_text("{", encoding="utf-8")
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["status"] == "error"
assert payload["multi_install_safe"] is None
assert payload["manifest_checked_integrations"] == []
assert payload["findings"][0]["code"] == "integration-state-unreadable"
assert "Detail:" in payload["findings"][0]["message"]
def test_status_reports_supported_schema_for_newer_integration_state(self, copilot_project):
state_path = copilot_project / ".specify" / "integration.json"
state = json.loads(state_path.read_text(encoding="utf-8"))
state["integration_state_schema"] = 99
state_path.write_text(json.dumps(state), encoding="utf-8")
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["findings"][0]["code"] == "integration-state-unreadable"
assert "schema 99" in payload["findings"][0]["message"]
assert "supported schema: 1" in payload["findings"][0]["message"]
def test_status_reports_missing_integration_json(self, copilot_project):
(copilot_project / ".specify" / "integration.json").unlink()
result = _run_in_project(copilot_project, ["integration", "status"])
assert result.exit_code != 0
assert "integration-state-missing" in result.output
assert ".specify/integration.json is missing" in result.output
assert "Multi-install safe: unknown" in result.output
def test_status_json_reports_unknown_multi_install_safety_when_state_missing(
self,
copilot_project,
):
(copilot_project / ".specify" / "integration.json").unlink()
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["status"] == "error"
assert payload["multi_install_safe"] is None
assert payload["manifest_checked_integrations"] == []
assert payload["findings"][0]["code"] == "integration-state-missing"
def test_status_json_reports_no_installed_integrations_as_warning(self, copilot_project):
state_path = copilot_project / ".specify" / "integration.json"
state_path.write_text(
json.dumps({
"version": "test",
"integration_state_schema": 1,
"installed_integrations": [],
}),
encoding="utf-8",
)
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code == 0
payload = json.loads(result.output)
assert payload["status"] == "warning"
assert payload["installed_integrations"] == []
assert payload["multi_install_safe"] is None
assert payload["manifest_checked_integrations"] == ["speckit"]
assert payload["findings"][0]["code"] == "no-installed-integrations"
assert "speckit" in payload["manifests"]
assert payload["manifests"]["speckit"]["readable"] is True
def test_status_checks_shared_manifest_when_no_integrations_installed(self, copilot_project):
state_path = copilot_project / ".specify" / "integration.json"
state_path.write_text(
json.dumps({
"version": "test",
"integration_state_schema": 1,
"installed_integrations": [],
}),
encoding="utf-8",
)
(copilot_project / ".specify" / "integrations" / "speckit.manifest.json").unlink()
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["status"] == "error"
assert payload["installed_integrations"] == []
assert payload["manifest_checked_integrations"] == ["speckit"]
assert payload["unchecked_manifests"] == 1
assert any(
item["code"] == "no-installed-integrations"
for item in payload["findings"]
)
assert any(
item["code"] == "manifest-missing"
and item["integration"] == "speckit"
for item in payload["findings"]
)
def test_status_json_reports_missing_default_integration_as_error(self, claude_project):
state_path = claude_project / ".specify" / "integration.json"
state = json.loads(state_path.read_text(encoding="utf-8"))
state.pop("default_integration", None)
state.pop("integration", None)
state["installed_integrations"] = ["claude"]
state_path.write_text(json.dumps(state), encoding="utf-8")
result = _run_in_project(claude_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["status"] == "error"
assert payload["default_integration"] is None
assert any(
item["code"] == "default-integration-missing"
for item in payload["findings"]
)
def test_status_ignores_non_list_raw_installed_integrations(self, copilot_project):
state_path = copilot_project / ".specify" / "integration.json"
state = json.loads(state_path.read_text(encoding="utf-8"))
state.pop("default_integration", None)
state.pop("integration", None)
state["installed_integrations"] = "copilot"
state_path.write_text(json.dumps(state), encoding="utf-8")
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code == 0
payload = json.loads(result.output)
assert payload["status"] == "warning"
assert payload["installed_integrations"] == []
assert payload["recorded_installed_integrations"] == []
assert payload["manifest_checked_integrations"] == ["speckit"]
assert payload["multi_install_safe"] is None
assert [item["code"] for item in payload["findings"]] == [
"installed-integrations-invalid",
"no-installed-integrations",
]
def test_status_reports_non_list_raw_installed_integrations_with_default(self, copilot_project):
state_path = copilot_project / ".specify" / "integration.json"
state = json.loads(state_path.read_text(encoding="utf-8"))
state["default_integration"] = "copilot"
state["integration"] = "copilot"
state["installed_integrations"] = "copilot"
state_path.write_text(json.dumps(state), encoding="utf-8")
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code == 0
payload = json.loads(result.output)
assert payload["status"] == "warning"
assert payload["installed_integrations"] == ["copilot"]
assert payload["recorded_installed_integrations"] == []
assert payload["manifest_checked_integrations"] == ["copilot", "speckit"]
assert payload["multi_install_safe"] is None
assert [item["code"] for item in payload["findings"]] == [
"installed-integrations-invalid",
]
def test_status_reports_default_integration_not_installed(self, claude_project):
state_path = claude_project / ".specify" / "integration.json"
state = json.loads(state_path.read_text(encoding="utf-8"))
state["default_integration"] = "codex"
state["integration"] = "codex"
state["installed_integrations"] = ["claude"]
state_path.write_text(json.dumps(state), encoding="utf-8")
result = _run_in_project(claude_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["default_integration"] == "codex"
assert payload["installed_integrations"] == ["codex", "claude"]
assert payload["recorded_installed_integrations"] == ["claude"]
assert payload["manifest_checked_integrations"] == ["claude", "speckit"]
assert any(
item["code"] == "default-integration-not-installed"
and "Default integration 'codex' is not listed" in item["message"]
for item in payload["findings"]
)
assert "codex" not in payload["manifests"]
assert not any(
item["code"] == "manifest-missing" and item.get("integration") == "codex"
for item in payload["findings"]
)
def test_status_checks_effective_default_manifest_when_raw_installed_is_empty(self, claude_project):
state_path = claude_project / ".specify" / "integration.json"
state = json.loads(state_path.read_text(encoding="utf-8"))
state["installed_integrations"] = []
state_path.write_text(json.dumps(state), encoding="utf-8")
result = _run_in_project(claude_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["installed_integrations"] == ["claude"]
assert payload["recorded_installed_integrations"] == []
assert payload["manifest_checked_integrations"] == ["claude", "speckit"]
assert payload["multi_install_safe"] is None
assert payload["manifests"]["claude"]["readable"] is True
assert any(
item["code"] == "default-integration-not-installed"
for item in payload["findings"]
)
def test_status_reports_missing_manifest(self, copilot_project):
(copilot_project / ".specify" / "integrations" / "copilot.manifest.json").unlink()
result = _run_in_project(copilot_project, ["integration", "status"])
assert result.exit_code != 0
assert "manifest-missing" in result.output
assert "Manifest for integration 'copilot' is missing" in result.output
def test_status_reports_unreadable_manifest_in_json_summary(self, copilot_project):
_write_invalid_manifest(copilot_project, "copilot")
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["unchecked_manifests"] == 1
assert payload["manifests"]["copilot"]["readable"] is False
assert payload["manifests"]["copilot"]["missing_files"] == []
assert payload["manifests"]["copilot"]["modified_files"] == []
def test_status_reports_modified_managed_files_without_failing(self, copilot_project):
manifest_path = copilot_project / ".specify" / "integrations" / "copilot.manifest.json"
tracked_files = json.loads(manifest_path.read_text(encoding="utf-8"))["files"]
first_rel = next(iter(tracked_files))
(copilot_project / first_rel).write_text("MODIFIED CONTENT\n", encoding="utf-8")
result = _run_in_project(copilot_project, ["integration", "status"])
assert result.exit_code == 0
assert "Integration status: WARNING" in result.output
assert "managed-files-modified" in result.output
assert "Modified managed files: 1" in result.output
def test_status_reports_missing_managed_files(self, copilot_project):
manifest_path = copilot_project / ".specify" / "integrations" / "copilot.manifest.json"
tracked_files = json.loads(manifest_path.read_text(encoding="utf-8"))["files"]
first_rel = next(iter(tracked_files))
(copilot_project / first_rel).unlink()
result = _run_in_project(copilot_project, ["integration", "status"])
assert result.exit_code != 0
assert "managed-files-missing" in result.output
assert "Missing managed files: 1" in result.output
def test_status_reports_missing_shared_managed_files(self, copilot_project):
shared_file = copilot_project / ".specify" / "scripts" / "bash" / "common.sh"
assert shared_file.exists()
shared_file.unlink()
result = _run_in_project(copilot_project, ["integration", "status"])
assert result.exit_code != 0
assert "managed-files-missing" in result.output
assert "shared Spec Kit infrastructure" in result.output
assert "Missing managed files: 1" in result.output
def test_status_does_not_use_exists_precheck_for_managed_files(self, tmp_path, monkeypatch):
from specify_cli.integration_status import _manifest_file_status
from specify_cli.integrations.manifest import IntegrationManifest
project = tmp_path / "proj"
project.mkdir()
tracked = project / "tracked.md"
tracked.write_text("content\n", encoding="utf-8")
manifest = IntegrationManifest("test", project, version="test")
manifest.record_existing("tracked.md")
def fail_exists(self):
raise AssertionError(f"Path.exists() should not be used for {self}")
monkeypatch.setattr(Path, "exists", fail_exists)
missing, modified, invalid, valid = _manifest_file_status(
manifest,
project.resolve(),
)
assert missing == []
assert modified == []
assert invalid == []
assert valid == ["tracked.md"]
def test_status_does_not_use_exists_precheck_for_manifest_load(self, copilot_project, monkeypatch):
def fail_exists(self):
raise AssertionError(f"Path.exists() should not be used for {self}")
monkeypatch.setattr(Path, "exists", fail_exists)
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code == 0
payload = json.loads(result.output)
assert payload["status"] == "ok"
assert payload["manifests"]["copilot"]["readable"] is True
def test_status_reports_unresolved_project_root_without_crashing(self, copilot_project, monkeypatch):
original_resolve = Path.resolve
failed = {"done": False}
def fail_first_project_root_resolve(self, *args, **kwargs):
if self == copilot_project and not failed["done"]:
failed["done"] = True
raise RuntimeError("symlink loop")
return original_resolve(self, *args, **kwargs)
monkeypatch.setattr(Path, "resolve", fail_first_project_root_resolve)
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code == 0
payload = json.loads(result.output)
assert payload["status"] == "warning"
assert any(item["code"] == "project-root-unresolved" for item in payload["findings"])
def test_status_loads_manifests_when_project_root_resolution_keeps_failing(
self,
copilot_project,
monkeypatch,
):
original_resolve = Path.resolve
def fail_project_root_resolve(self, *args, **kwargs):
if self == copilot_project:
raise RuntimeError("symlink loop")
return original_resolve(self, *args, **kwargs)
monkeypatch.setattr(Path, "resolve", fail_project_root_resolve)
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload["status"] == "warning"
assert payload["manifests"]["copilot"]["readable"] is True
assert payload["manifests"]["speckit"]["readable"] is True
assert any(item["code"] == "project-root-unresolved" for item in payload["findings"])
def test_status_uses_lexical_manifest_paths_when_project_root_resolution_falls_back(self, tmp_path):
from specify_cli.integration_status import _manifest_file_status
from specify_cli.integrations.manifest import IntegrationManifest
real_project = tmp_path / "real-project"
real_project.mkdir()
tracked = real_project / "tracked.md"
tracked.write_text("content\n", encoding="utf-8")
symlinked_project = tmp_path / "symlinked-project"
try:
symlinked_project.symlink_to(real_project, target_is_directory=True)
except OSError as exc:
pytest.skip(f"symlinks unavailable: {exc}")
manifest = IntegrationManifest("test", real_project, version="test")
manifest.record_existing("tracked.md")
manifest.project_root = symlinked_project.absolute()
missing, modified, invalid, valid = _manifest_file_status(
manifest,
symlinked_project.absolute(),
project_root_is_resolved=False,
)
assert missing == []
assert modified == []
assert invalid == []
assert valid == ["tracked.md"]
def test_status_treats_resolve_runtime_error_as_invalid_path(self, tmp_path, monkeypatch):
from specify_cli.integration_status import _manifest_file_status
from specify_cli.integrations.manifest import IntegrationManifest
project = tmp_path / "proj"
project.mkdir()
tracked = project / "tracked.md"
tracked.write_text("content\n", encoding="utf-8")
manifest = IntegrationManifest("test", project, version="test")
manifest.record_existing("tracked.md")
project_root_resolved = project.resolve()
original_resolve = Path.resolve
def fail_project_parent_resolve(self, *args, **kwargs):
if self == project:
raise RuntimeError("symlink loop")
return original_resolve(self, *args, **kwargs)
monkeypatch.setattr(Path, "resolve", fail_project_parent_resolve)
missing, modified, invalid, valid = _manifest_file_status(
manifest,
project_root_resolved,
)
assert missing == []
assert modified == []
assert invalid == ["tracked.md"]
assert valid == []
def test_status_does_not_mask_runtime_errors_from_manifest_load(self, copilot_project, monkeypatch):
from specify_cli import integration_status as status_module
def fail_load(key, project_root, **kwargs):
raise RuntimeError(f"unexpected manifest loader bug for {key}")
monkeypatch.setattr(status_module.IntegrationManifest, "load", fail_load)
with pytest.raises(RuntimeError, match="unexpected manifest loader bug"):
status_module.build_integration_status_report(copilot_project)
def test_status_treats_dangling_symlink_as_missing(self, copilot_project):
manifest_path = copilot_project / ".specify" / "integrations" / "copilot.manifest.json"
tracked_files = json.loads(manifest_path.read_text(encoding="utf-8"))["files"]
first_rel = next(iter(tracked_files))
target = copilot_project / first_rel
target.unlink()
try:
target.symlink_to(copilot_project / "missing-target")
except OSError as exc:
pytest.skip(f"symlinks unavailable: {exc}")
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert first_rel in payload["manifests"]["copilot"]["missing_files"]
assert first_rel not in payload["manifests"]["copilot"]["modified_files"]
def test_status_treats_windows_style_dangling_symlink_as_missing(self, tmp_path, monkeypatch):
from specify_cli.integration_status import _manifest_file_status
from specify_cli.integrations.manifest import IntegrationManifest
project = tmp_path / "proj"
project.mkdir()
tracked = project / "tracked.md"
tracked.write_text("content\n", encoding="utf-8")
regular_stat = tracked.lstat()
manifest = IntegrationManifest("test", project, version="test")
manifest.record_existing("tracked.md")
tracked.unlink()
try:
tracked.symlink_to(project / "missing-target")
except OSError as exc:
pytest.skip(f"symlinks unavailable: {exc}")
original_lstat = Path.lstat
original_is_symlink = Path.is_symlink
def windows_style_lstat(self):
if self == tracked:
return regular_stat
return original_lstat(self)
def windows_style_is_symlink(self):
if self == tracked:
return True
return original_is_symlink(self)
monkeypatch.setattr(Path, "lstat", windows_style_lstat)
monkeypatch.setattr(Path, "is_symlink", windows_style_is_symlink)
missing, modified, invalid, valid = _manifest_file_status(
manifest,
project.resolve(),
)
assert missing == ["tracked.md"]
assert modified == []
assert invalid == []
assert valid == ["tracked.md"]
def test_strip_extended_length_prefix_normalizes_windows_paths(self):
from specify_cli.integration_status import _strip_extended_length_prefix
# Build the prefixed strings explicitly so the test is meaningful on
# every platform (POSIX won't parse backslash separators, but the
# helper operates on the string form). Compare Path objects rather than
# their str() form: on Windows pathlib renders a UNC root with a
# trailing separator (``\\server\share\``), so an exact string match is
# brittle, whereas Path equality captures the intended semantics on
# both POSIX and Windows.
bs = "\\"
assert _strip_extended_length_prefix(
Path(f"{bs}{bs}?{bs}C:{bs}proj")
) == Path(f"C:{bs}proj")
assert _strip_extended_length_prefix(
Path(f"{bs}{bs}?{bs}UNC{bs}server{bs}share")
) == Path(f"{bs}{bs}server{bs}share")
# Paths without the prefix are returned unchanged.
assert _strip_extended_length_prefix(Path("relative/path")) == Path("relative/path")
def test_is_within_project_tolerates_extended_length_prefix(self):
from specify_cli.integration_status import _is_within_project
# A readlink result on POSIX never carries the prefix, so an in-project
# child is contained and an outside path is not. The Windows
# prefix-stripping branch is exercised by the dangling-symlink tests on
# Windows CI; here we lock in the cross-platform containment contract.
root = Path("/tmp/project").resolve()
assert _is_within_project(root, root / "child")
assert not _is_within_project(root, Path("/tmp/other").resolve())
def test_status_reports_unsafe_manifest_paths_without_hashing_them(self, tmp_path, copilot_project):
outside = tmp_path / "outside"
outside.mkdir()
(outside / "secret.txt").write_text("outside project\n", encoding="utf-8")
link = copilot_project / "outside-link"
try:
link.symlink_to(outside, target_is_directory=True)
except OSError as exc:
pytest.skip(f"symlinks unavailable: {exc}")
manifest_path = copilot_project / ".specify" / "integrations" / "copilot.manifest.json"
manifest_data = json.loads(manifest_path.read_text(encoding="utf-8"))
manifest_data["files"]["outside-link/secret.txt"] = "wrong"
manifest_path.write_text(json.dumps(manifest_data), encoding="utf-8")
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["invalid_manifest_paths"] == 1
assert "outside-link/secret.txt" in payload["manifests"]["copilot"]["invalid_files"]
assert "outside-link/secret.txt" not in payload["manifests"]["copilot"]["modified_files"]
def test_status_reports_tracked_symlink_target_escape_as_invalid(self, tmp_path, copilot_project, monkeypatch):
outside = tmp_path / "outside"
outside.mkdir()
outside_file = outside / "secret.txt"
outside_file.write_text("outside project\n", encoding="utf-8")
manifest_path = copilot_project / ".specify" / "integrations" / "copilot.manifest.json"
tracked_files = json.loads(manifest_path.read_text(encoding="utf-8"))["files"]
first_rel = next(iter(tracked_files))
tracked_path = copilot_project / first_rel
tracked_path.unlink()
try:
tracked_path.symlink_to(outside_file)
except OSError as exc:
pytest.skip(f"symlinks unavailable: {exc}")
original_stat = Path.stat
def fail_tracked_symlink_stat(self, *args, **kwargs):
follows_symlinks = kwargs.get("follow_symlinks", True)
if self == tracked_path and follows_symlinks:
raise AssertionError("Path.stat() should not follow tracked symlinks")
return original_stat(self, *args, **kwargs)
monkeypatch.setattr(Path, "stat", fail_tracked_symlink_stat)
result = _run_in_project(copilot_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["invalid_manifest_paths"] == 1
assert first_rel in payload["manifests"]["copilot"]["invalid_files"]
assert first_rel not in payload["manifests"]["copilot"]["modified_files"]
def test_status_reports_unsafe_multi_install_combination(self, copilot_project):
from specify_cli.integrations.manifest import IntegrationManifest
state_path = copilot_project / ".specify" / "integration.json"
state = json.loads(state_path.read_text(encoding="utf-8"))
state["installed_integrations"] = ["copilot", "claude"]
state["default_integration"] = "copilot"
state["integration"] = "copilot"
state_path.write_text(json.dumps(state), encoding="utf-8")
IntegrationManifest("claude", copilot_project, version="test").save()
result = _run_in_project(copilot_project, ["integration", "status"])
assert result.exit_code != 0
assert "unsafe-multi-install" in result.output
assert "Multi-install safe: no" in result.output
assert "specify integration switch <key>" in result.output
def test_status_treats_unknown_multi_install_as_unsafe(self, claude_project):
from specify_cli.integrations.manifest import IntegrationManifest
state_path = claude_project / ".specify" / "integration.json"
state = json.loads(state_path.read_text(encoding="utf-8"))
state["installed_integrations"] = ["claude", "mystery"]
state["default_integration"] = "claude"
state["integration"] = "claude"
state_path.write_text(json.dumps(state), encoding="utf-8")
IntegrationManifest("mystery", claude_project, version="test").save()
result = _run_in_project(claude_project, ["integration", "status"])
assert result.exit_code != 0
assert "unknown-integration" in result.output
assert "unsafe-multi-install" in result.output
assert "remove the stale integration entry" in result.output
assert "Multi-install safe: no" in result.output
def test_status_gives_actionable_suggestion_for_unknown_manifest(self, claude_project):
state_path = claude_project / ".specify" / "integration.json"
state = json.loads(state_path.read_text(encoding="utf-8"))
state["installed_integrations"] = ["mystery"]
state["default_integration"] = "mystery"
state["integration"] = "mystery"
state_path.write_text(json.dumps(state), encoding="utf-8")
result = _run_in_project(claude_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
manifest_finding = next(
item for item in payload["findings"]
if item["code"] == "manifest-missing" and item["integration"] == "mystery"
)
assert "remove the stale integration entry" in manifest_finding["suggestion"]
assert "integration upgrade mystery" not in manifest_finding["suggestion"]
def test_status_rejects_unsafe_integration_keys_before_manifest_lookup(self, tmp_path, claude_project):
state_path = claude_project / ".specify" / "integration.json"
unsafe_key = "../../../escape"
state_path.write_text(
json.dumps({
"integration": unsafe_key,
"default_integration": unsafe_key,
"installed_integrations": [unsafe_key],
}),
encoding="utf-8",
)
outside_manifest = tmp_path / "escape.manifest.json"
outside_manifest.write_text(
json.dumps({"integration": unsafe_key, "files": {}}),
encoding="utf-8",
)
result = _run_in_project(claude_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert unsafe_key not in payload["manifests"]
assert payload["manifest_checked_integrations"] == ["speckit"]
assert any(
item["code"] == "integration-key-invalid"
and item["integration"] == unsafe_key
for item in payload["findings"]
)
def test_status_rejects_filename_invalid_integration_keys(self, claude_project):
state_path = claude_project / ".specify" / "integration.json"
unsafe_key = "bad:key"
state_path.write_text(
json.dumps({
"integration": unsafe_key,
"default_integration": unsafe_key,
"installed_integrations": [unsafe_key],
}),
encoding="utf-8",
)
result = _run_in_project(claude_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert any(
item["code"] == "integration-key-invalid"
and item["integration"] == unsafe_key
for item in payload["findings"]
)
def test_status_rejects_windows_reserved_integration_keys(self, claude_project):
state_path = claude_project / ".specify" / "integration.json"
unsafe_key = "CON"
state_path.write_text(
json.dumps({
"integration": unsafe_key,
"default_integration": unsafe_key,
"installed_integrations": [unsafe_key],
}),
encoding="utf-8",
)
result = _run_in_project(claude_project, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert any(
item["code"] == "integration-key-invalid"
and item["integration"] == unsafe_key
for item in payload["findings"]
)
def test_status_reports_managed_file_collisions(self, claude_project):
from specify_cli.integrations.manifest import IntegrationManifest
state_path = claude_project / ".specify" / "integration.json"
state = json.loads(state_path.read_text(encoding="utf-8"))
state["installed_integrations"] = ["claude", "codex"]
state["default_integration"] = "claude"
state["integration"] = "claude"
state_path.write_text(json.dumps(state), encoding="utf-8")
claude_manifest = claude_project / ".specify" / "integrations" / "claude.manifest.json"
tracked_files = json.loads(claude_manifest.read_text(encoding="utf-8"))["files"]
shared_rel = next(iter(tracked_files))
codex_manifest = IntegrationManifest("codex", claude_project, version="test")
codex_manifest.record_existing(shared_rel)
codex_manifest.save()
result = _run_in_project(claude_project, ["integration", "status"])
assert result.exit_code == 0
assert "managed-file-collision" in result.output
assert "Integration status: WARNING" in result.output
def test_status_json_is_not_rich_rendered(self, tmp_path, monkeypatch):
project = tmp_path / "proj"
project.mkdir()
(project / ".specify").mkdir()
(project / ".specify" / "integration.json").write_text(
json.dumps({
"integration": "[red]x[/red]",
"installed_integrations": ["[red]x[/red]"],
}),
encoding="utf-8",
)
monkeypatch.chdir(project)
result = runner.invoke(app, ["integration", "status", "--json"])
assert result.exit_code != 0
payload = json.loads(result.output)
assert payload["default_integration"] == "[red]x[/red]"
assert payload["installed_integrations"] == ["[red]x[/red]"]
def test_status_text_escapes_rich_markup_from_project_state(self, tmp_path, monkeypatch):
project = tmp_path / "proj"
project.mkdir()
(project / ".specify").mkdir()
(project / ".specify" / "integration.json").write_text(
json.dumps({
"integration": "[red]x[/red]",
"installed_integrations": ["[red]x[/red]"],
}),
encoding="utf-8",
)
monkeypatch.chdir(project)
result = runner.invoke(app, ["integration", "status"])
assert result.exit_code != 0
assert "Default integration: [red]x[/red]" in result.output
assert "Installed integrations: [red]x[/red]" in result.output
# ── install ──────────────────────────────────────────────────────────

View File

@@ -3087,424 +3087,6 @@ class TestExtensionCatalog:
assert captured["req"].get_header("Authorization") == "Bearer ghp_testtoken"
@pytest.mark.parametrize(
"payload",
[
# Root is not a JSON object.
[],
"oops",
42,
None,
# Root is fine but ``extensions`` is the wrong type.
{"schema_version": "1.0", "extensions": []},
{"schema_version": "1.0", "extensions": "oops"},
{"schema_version": "1.0", "extensions": None},
{"schema_version": "1.0", "extensions": 42},
],
)
def test_fetch_single_catalog_rejects_malformed_payload(self, temp_dir, payload):
"""Malformed catalog payloads raise ExtensionError, not AttributeError.
Without this guard, a payload like ``{"extensions": []}`` would pass the
key-presence check and then crash with ``AttributeError: 'list' object
has no attribute 'items'`` deep inside ``_get_merged_extensions``. The
sibling integration catalog reader already validates both the root
object and the nested mapping (see ``integrations/catalog.py``); the
extension catalog must stay consistent.
"""
from unittest.mock import patch, MagicMock
catalog = self._make_catalog(temp_dir)
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(payload).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
entry = CatalogEntry(
url="https://example.com/catalog.json",
name="default",
priority=1,
install_allowed=True,
)
with patch.object(catalog, "_open_url", return_value=mock_response):
with pytest.raises(ExtensionError, match="Invalid catalog format"):
catalog._fetch_single_catalog(entry, force_refresh=True)
@pytest.mark.parametrize(
"cached_payload",
[
[],
"oops",
42,
None,
{"schema_version": "1.0", "extensions": []},
{"schema_version": "1.0", "extensions": "oops"},
{"schema_version": "1.0", "extensions": None},
],
)
def test_fetch_single_catalog_rejects_malformed_cached_payload(
self, temp_dir, cached_payload
):
"""A poisoned cache silently falls back to the network instead of
crashing — cached payloads pass through the same shape validation
as freshly-fetched ones.
Without this, a cache poisoned by an older spec-kit version (or a
manual edit, or an upstream that briefly served a bad payload
before the network guards landed) would re-crash every invocation
of ``_get_merged_extensions`` despite the cache being "valid" by
age. The recovery contract is: if the cached payload fails
validation, drop it and refetch — never propagate
``AttributeError`` to the caller.
"""
from unittest.mock import patch, MagicMock
catalog = self._make_catalog(temp_dir)
# Poison the default-URL cache. ``DEFAULT_CATALOG_URL`` is the
# branch that goes through ``is_cache_valid()`` (the non-default
# branch uses per-URL hashed cache files but the same code path
# below).
catalog.cache_dir.mkdir(parents=True, exist_ok=True)
catalog.cache_file.write_text(json.dumps(cached_payload))
catalog.cache_metadata_file.write_text(
json.dumps(
{
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": ExtensionCatalog.DEFAULT_CATALOG_URL,
}
)
)
# Network refetch returns a valid payload so the recovery path
# can complete.
valid = {
"schema_version": "1.0",
"extensions": {"foo": {"name": "Foo", "version": "1.0.0"}},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(valid).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
entry = CatalogEntry(
url=ExtensionCatalog.DEFAULT_CATALOG_URL,
name="default",
priority=1,
install_allowed=True,
)
with patch.object(catalog, "_open_url", return_value=mock_response):
result = catalog._fetch_single_catalog(entry, force_refresh=False)
# The poisoned cache was discarded and the network payload returned.
assert result == valid
@pytest.mark.parametrize(
"payload",
[
# Root is not a JSON object.
[],
"oops",
42,
None,
# Root is fine but ``extensions`` is the wrong type.
{"schema_version": "1.0", "extensions": []},
{"schema_version": "1.0", "extensions": "oops"},
{"schema_version": "1.0", "extensions": None},
],
)
def test_fetch_catalog_rejects_malformed_payload(self, temp_dir, payload):
"""Legacy ``fetch_catalog`` reuses the same shape-validation helper.
Before this change ``fetch_catalog`` only checked key presence — so
a payload like ``42`` would crash with
``TypeError: argument of type 'int' is not iterable`` during the
``"schema_version" in catalog_data`` check, and an entry mapping
of the wrong type would crash downstream. Reusing
``_validate_catalog_payload`` keeps the network-side behaviour of
the legacy single-catalog method consistent with the multi-catalog
``_fetch_single_catalog`` path.
"""
from unittest.mock import patch, MagicMock
catalog = self._make_catalog(temp_dir)
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(payload).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
with patch.object(catalog, "_open_url", return_value=mock_response):
with pytest.raises(ExtensionError, match="Invalid catalog format"):
catalog.fetch_catalog(force_refresh=True)
def test_fetch_catalog_recovers_from_unreadable_cache(self, temp_dir):
"""An unreadable / wrong-encoded cache file silently refetches.
The cache contract is best-effort: a JSON-decode failure, an OS
read failure (permissions / disk / handle limit), or an invalid
text encoding on a cache file written by an older client must
all fall through to the network fetch rather than crash the
caller. Covers Copilot's review point that the previous
``except (json.JSONDecodeError,)`` was too narrow.
"""
from unittest.mock import patch, MagicMock
catalog = self._make_catalog(temp_dir)
# Write invalid UTF-8 bytes to the cache file so ``read_text``
# raises ``UnicodeDecodeError`` (a subclass of ``UnicodeError``).
catalog.cache_dir.mkdir(parents=True, exist_ok=True)
catalog.cache_file.write_bytes(b"\xff\xfe\x00not-utf-8")
catalog.cache_metadata_file.write_text(
json.dumps(
{
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": ExtensionCatalog.DEFAULT_CATALOG_URL,
}
),
encoding="utf-8",
)
valid = {
"schema_version": "1.0",
"extensions": {"foo": {"name": "Foo", "version": "1.0.0"}},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(valid).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
with patch.object(catalog, "_open_url", return_value=mock_response):
result = catalog.fetch_catalog(force_refresh=False)
# Recovered via network rather than crashing on the unreadable cache.
assert result == valid
def test_fetch_catalog_recovers_from_unreadable_metadata(self, temp_dir):
"""A wrongly-encoded metadata file degrades to a cache miss.
``is_cache_valid`` is consulted *before* the cache payload is
read; if the metadata file itself can't be decoded (e.g. it was
written on a Windows host whose default codec isn't UTF-8) the
validity check must return ``False`` rather than propagate
``UnicodeDecodeError``. Without that guard, a corrupted metadata
file would crash every invocation instead of falling through to
a network refetch.
"""
from unittest.mock import patch, MagicMock
catalog = self._make_catalog(temp_dir)
catalog.cache_dir.mkdir(parents=True, exist_ok=True)
catalog.cache_file.write_text("{}", encoding="utf-8")
# Bytes that are not valid UTF-8 — ``read_text(encoding="utf-8")``
# will raise ``UnicodeDecodeError`` (subclass of ``UnicodeError``).
catalog.cache_metadata_file.write_bytes(b"\xff\xfe\x00bad")
# is_cache_valid must absorb the decode failure, not crash.
assert catalog.is_cache_valid() is False
valid = {
"schema_version": "1.0",
"extensions": {"foo": {"name": "Foo", "version": "1.0.0"}},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(valid).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
with patch.object(catalog, "_open_url", return_value=mock_response):
result = catalog.fetch_catalog(force_refresh=False)
assert result == valid
@pytest.mark.parametrize(
"non_mapping_metadata",
[
"[]", # JSON array
'"oops"', # JSON string
"42", # JSON number
"true", # JSON bool
"null", # JSON null
],
)
def test_is_cache_valid_handles_non_mapping_metadata(
self, temp_dir, non_mapping_metadata
):
"""Metadata that parses to a non-mapping degrades to cache-invalid.
The cache-validity check calls ``metadata.get("cached_at", "")``
immediately after ``json.loads``. If the metadata file is valid
JSON but parses to a non-mapping (``[]``, ``"oops"``, ``42``,
``true``, ``null``), ``.get`` raises ``AttributeError`` — which
previously slipped past the except tuple and crashed the
caller. The contract documented on ``is_cache_valid`` says any
decode/shape failure should return ``False`` so ``fetch_catalog``
falls through to a network refetch. This test pins that
contract across every JSON non-mapping root type so a regression
in the except clause can't silently re-introduce the crash.
"""
catalog = self._make_catalog(temp_dir)
catalog.cache_dir.mkdir(parents=True, exist_ok=True)
catalog.cache_file.write_text("{}", encoding="utf-8")
catalog.cache_metadata_file.write_text(
non_mapping_metadata, encoding="utf-8"
)
# Must not raise — the contract is "any decode/shape failure → False".
assert catalog.is_cache_valid() is False
def test_fetch_catalog_writes_cache_as_utf8(self, temp_dir, monkeypatch):
"""Cache + metadata writes pass ``encoding="utf-8"``, observably.
The earlier version of this test claimed to assert UTF-8 at the
byte level but actually only round-tripped a non-ASCII string
through ``json.dumps`` and ``read_text(encoding="utf-8")``.
Because ``json.dumps`` defaults to ``ensure_ascii=True``, "café"
was serialized as the all-ASCII escape ``caf\\u00e9`` before it
ever reached ``write_text`` — the bytes on disk were identical
regardless of the encoding kwarg, so a locale-encoded write
would have round-tripped just fine. The drift Copilot's review
flagged wasn't actually being caught.
Fix: directly observe the ``encoding`` argument passed to every
``write_text`` call made against the cache directory. This is
the production code's encoding choice, which is exactly what
the regression guard cares about; non-ASCII payload tricks are
unnecessary because the assertion is about the kwarg, not the
bytes.
"""
from unittest.mock import patch, MagicMock
from pathlib import Path as _PathCls
catalog = self._make_catalog(temp_dir)
payload = {
"schema_version": "1.0",
"extensions": {"foo": {"name": "Foo", "version": "1.0.0"}},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(payload).encode("utf-8")
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
# Record every ``write_text`` call's encoding kwarg so the
# assertion observes the production writer's argument directly.
recorded: list[dict] = []
real_write_text = _PathCls.write_text
def recording_write_text(self, data, *args, **kwargs):
recorded.append(
{"path": str(self), "encoding": kwargs.get("encoding")}
)
return real_write_text(self, data, *args, **kwargs)
monkeypatch.setattr(_PathCls, "write_text", recording_write_text)
with patch.object(catalog, "_open_url", return_value=mock_response):
catalog.fetch_catalog(force_refresh=True)
# Filter to writes inside the catalog's cache directory so
# unrelated writes from other machinery don't pollute the
# assertion.
cache_writes = [
r for r in recorded if str(catalog.cache_dir) in r["path"]
]
assert cache_writes, "fetch_catalog made no writes to the cache dir"
for record in cache_writes:
assert record["encoding"] == "utf-8", (
f"write_text on {record['path']} used encoding "
f"{record['encoding']!r}; expected 'utf-8'"
)
def test_fetch_catalog_survives_unwritable_cache(self, temp_dir, monkeypatch):
"""An unwritable cache dir doesn't fail a successful fetch.
Cache writes are best-effort, mirroring the read side and the
``integrations/catalog.py`` precedent: if ``mkdir``/``write_text``
raises ``OSError`` (read-only checkout, permissions), the
already-fetched-and-validated payload must still be returned
rather than surfacing the cache failure to the caller.
"""
from unittest.mock import patch, MagicMock
from pathlib import Path as _PathCls
catalog = self._make_catalog(temp_dir)
valid = {
"schema_version": "1.0",
"extensions": {"foo": {"name": "Foo", "version": "1.0.0"}},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(valid).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
# Simulate an unwritable cache dir: every write_text under the
# cache directory raises PermissionError (an OSError subclass).
real_write_text = _PathCls.write_text
def failing_write_text(self, data, *args, **kwargs):
if str(catalog.cache_dir) in str(self):
raise PermissionError("cache dir is read-only")
return real_write_text(self, data, *args, **kwargs)
monkeypatch.setattr(_PathCls, "write_text", failing_write_text)
with patch.object(catalog, "_open_url", return_value=mock_response):
# Legacy single-catalog path.
assert catalog.fetch_catalog(force_refresh=True) == valid
# Multi-catalog path.
entry = CatalogEntry(
url="https://example.com/catalog.json",
name="default",
priority=1,
install_allowed=True,
)
assert catalog._fetch_single_catalog(entry, force_refresh=True) == valid
def test_get_merged_extensions_skips_non_mapping_entries(self, temp_dir):
"""Per-entry guard: one malformed entry shouldn't poison the merge.
``_fetch_single_catalog`` validates that ``extensions`` is a mapping,
but it doesn't (and shouldn't) validate every entry inside it — a
single bad entry in an otherwise-valid catalog should be skipped, not
crash the whole resolve path. Mirrors the per-entry skip in
``integrations/catalog.py``: a malformed entry returns no error,
valid entries continue to merge normally.
"""
from unittest.mock import patch, MagicMock
catalog = self._make_catalog(temp_dir)
# Mix of valid entry, list-shaped entry, and string-shaped entry.
payload = {
"schema_version": "1.0",
"extensions": {
"good": {"name": "Good", "version": "1.0.0"},
"bad-list": [],
"bad-str": "oops",
},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(payload).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
entry = CatalogEntry(
url="https://example.com/catalog.json",
name="default",
priority=1,
install_allowed=True,
)
with patch.object(catalog, "_open_url", return_value=mock_response), \
patch.object(catalog, "get_active_catalogs", return_value=[entry]):
merged = catalog._get_merged_extensions(force_refresh=True)
# Only the well-formed entry survives; the two malformed entries are
# silently dropped rather than raising or crashing.
assert [ext["id"] for ext in merged] == ["good"]
def test_download_extension_sends_auth_header(self, temp_dir, monkeypatch):
"""download_extension passes Authorization header when a provider is configured."""
from unittest.mock import patch, MagicMock

View File

@@ -1514,421 +1514,6 @@ class TestPresetCatalog:
assert captured["req"].get_header("Authorization") == "Bearer ghp_testtoken"
@pytest.mark.parametrize(
"payload",
[
# Root is not a JSON object.
[],
"oops",
42,
None,
# Root is fine but ``presets`` is the wrong type.
{"schema_version": "1.0", "presets": []},
{"schema_version": "1.0", "presets": "oops"},
{"schema_version": "1.0", "presets": None},
{"schema_version": "1.0", "presets": 42},
],
)
def test_fetch_single_catalog_rejects_malformed_payload(self, project_dir, payload):
"""Malformed catalog payloads raise PresetError, not AttributeError.
Without this guard, a payload like ``{"presets": []}`` would pass the
key-presence check and then crash with ``AttributeError: 'list' object
has no attribute 'items'`` deep inside ``_get_merged_packs``. The
sibling integration catalog reader already validates both the root
object and the nested mapping (see ``integrations/catalog.py``); the
preset catalog must stay consistent.
"""
from unittest.mock import patch, MagicMock
catalog = PresetCatalog(project_dir)
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(payload).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
entry = PresetCatalogEntry(
url="https://example.com/catalog.json",
name="default",
priority=1,
install_allowed=True,
)
with patch.object(catalog, "_open_url", return_value=mock_response):
with pytest.raises(PresetError, match="Invalid preset catalog format"):
catalog._fetch_single_catalog(entry, force_refresh=True)
@pytest.mark.parametrize(
"cached_payload",
[
[],
"oops",
42,
None,
{"schema_version": "1.0", "presets": []},
{"schema_version": "1.0", "presets": "oops"},
{"schema_version": "1.0", "presets": None},
],
)
def test_fetch_single_catalog_rejects_malformed_cached_payload(
self, project_dir, cached_payload
):
"""A poisoned cache silently falls back to the network instead of
crashing — cached payloads pass through the same shape validation
as freshly-fetched ones.
Without this, a cache poisoned by an older spec-kit version (or a
manual edit, or an upstream that briefly served a bad payload
before the network guards landed) would re-crash every invocation
of ``_get_merged_packs`` despite the cache being "valid" by age.
The recovery contract is: if the cached payload fails validation,
drop it and refetch — never propagate ``AttributeError`` to the
caller.
"""
from unittest.mock import patch, MagicMock
catalog = PresetCatalog(project_dir)
# Poison the default-URL cache. ``DEFAULT_CATALOG_URL`` and
# non-default URLs both flow through the same cache-load branch.
cache_file, metadata_file = catalog._get_cache_paths(
catalog.DEFAULT_CATALOG_URL
)
cache_file.parent.mkdir(parents=True, exist_ok=True)
cache_file.write_text(json.dumps(cached_payload))
metadata_file.write_text(
json.dumps(
{
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": catalog.DEFAULT_CATALOG_URL,
}
)
)
# Network refetch returns a valid payload so the recovery path
# can complete.
valid = {
"schema_version": "1.0",
"presets": {"foo": {"name": "Foo", "version": "1.0.0"}},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(valid).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
entry = PresetCatalogEntry(
url=catalog.DEFAULT_CATALOG_URL,
name="default",
priority=1,
install_allowed=True,
)
with patch.object(catalog, "_open_url", return_value=mock_response):
result = catalog._fetch_single_catalog(entry, force_refresh=False)
# The poisoned cache was discarded and the network payload returned.
assert result == valid
@pytest.mark.parametrize(
"payload",
[
# Root is not a JSON object.
[],
"oops",
42,
None,
# Root is fine but ``presets`` is the wrong type.
{"schema_version": "1.0", "presets": []},
{"schema_version": "1.0", "presets": "oops"},
{"schema_version": "1.0", "presets": None},
],
)
def test_fetch_catalog_rejects_malformed_payload(self, project_dir, payload):
"""Legacy ``fetch_catalog`` reuses the same shape-validation helper.
Before this change ``fetch_catalog`` only checked key presence —
so a payload like ``42`` would crash with
``TypeError: argument of type 'int' is not iterable`` during the
``"schema_version" in catalog_data`` check, and an entry mapping
of the wrong type would crash downstream. Reusing
``_validate_catalog_payload`` keeps the network-side behaviour of
the legacy single-catalog method consistent with the multi-catalog
``_fetch_single_catalog`` path.
"""
from unittest.mock import patch, MagicMock
catalog = PresetCatalog(project_dir)
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(payload).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
with patch.object(catalog, "_open_url", return_value=mock_response):
with pytest.raises(PresetError, match="Invalid preset catalog format"):
catalog.fetch_catalog(force_refresh=True)
def test_fetch_catalog_recovers_from_unreadable_cache(self, project_dir):
"""An unreadable / wrong-encoded cache file silently refetches.
The cache contract is best-effort: a JSON-decode failure, an OS
read failure (permissions / disk / handle limit), or an invalid
text encoding on a cache file written by an older client must
all fall through to the network fetch rather than crash the
caller. Covers Copilot's review point that the previous
``except (json.JSONDecodeError, OSError)`` was missing
``UnicodeError``.
"""
from unittest.mock import patch, MagicMock
catalog = PresetCatalog(project_dir)
catalog.cache_dir.mkdir(parents=True, exist_ok=True)
# Invalid UTF-8 bytes so ``read_text`` raises ``UnicodeDecodeError``
# (a subclass of ``UnicodeError``).
catalog.cache_file.write_bytes(b"\xff\xfe\x00not-utf-8")
catalog.cache_metadata_file.write_text(
json.dumps(
{
"cached_at": datetime.now(timezone.utc).isoformat(),
"catalog_url": catalog.get_catalog_url(),
}
),
encoding="utf-8",
)
valid = {
"schema_version": "1.0",
"presets": {"foo": {"name": "Foo", "version": "1.0.0"}},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(valid).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
with patch.object(catalog, "_open_url", return_value=mock_response):
result = catalog.fetch_catalog(force_refresh=False)
# Recovered via network rather than crashing on the unreadable cache.
assert result == valid
def test_fetch_catalog_recovers_from_unreadable_metadata(self, project_dir):
"""A wrongly-encoded metadata file degrades to a cache miss.
``is_cache_valid`` is consulted *before* the cache payload is
read; if the metadata file itself can't be decoded (e.g. it was
written on a host whose default codec isn't UTF-8) the validity
check must return ``False`` rather than propagate
``UnicodeDecodeError``. Without that guard, a corrupted metadata
file would crash every invocation instead of falling through to
a network refetch.
"""
from unittest.mock import patch, MagicMock
catalog = PresetCatalog(project_dir)
catalog.cache_dir.mkdir(parents=True, exist_ok=True)
catalog.cache_file.write_text("{}", encoding="utf-8")
# Bytes that are not valid UTF-8 — ``read_text(encoding="utf-8")``
# will raise ``UnicodeDecodeError`` (subclass of ``UnicodeError``).
catalog.cache_metadata_file.write_bytes(b"\xff\xfe\x00bad")
# is_cache_valid must absorb the decode failure, not crash.
assert catalog.is_cache_valid() is False
valid = {
"schema_version": "1.0",
"presets": {"foo": {"name": "Foo", "version": "1.0.0"}},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(valid).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
with patch.object(catalog, "_open_url", return_value=mock_response):
result = catalog.fetch_catalog(force_refresh=False)
assert result == valid
@pytest.mark.parametrize(
"non_mapping_metadata",
[
"[]", # JSON array
'"oops"', # JSON string
"42", # JSON number
"true", # JSON bool
"null", # JSON null
],
)
def test_is_cache_valid_handles_non_mapping_metadata(
self, project_dir, non_mapping_metadata
):
"""Metadata that parses to a non-mapping degrades to cache-invalid.
The cache-validity check calls ``metadata.get("cached_at", "")``
immediately after ``json.loads``. If the metadata file is valid
JSON but parses to a non-mapping (``[]``, ``"oops"``, ``42``,
``true``, ``null``), ``.get`` raises ``AttributeError`` — which
previously slipped past the except tuple and crashed the
caller. The contract documented on ``is_cache_valid`` says any
decode/shape failure should return ``False`` so ``fetch_catalog``
falls through to a network refetch. This test pins that
contract across every JSON non-mapping root type.
"""
catalog = PresetCatalog(project_dir)
catalog.cache_dir.mkdir(parents=True, exist_ok=True)
catalog.cache_file.write_text("{}", encoding="utf-8")
catalog.cache_metadata_file.write_text(
non_mapping_metadata, encoding="utf-8"
)
# Must not raise — the contract is "any decode/shape failure → False".
assert catalog.is_cache_valid() is False
def test_fetch_catalog_writes_cache_as_utf8(self, project_dir, monkeypatch):
"""Cache + metadata writes pass ``encoding="utf-8"``, observably.
The earlier version of this test claimed to assert UTF-8 at the
byte level but actually only round-tripped a non-ASCII string
through ``json.dumps`` and ``read_text(encoding="utf-8")``.
Because ``json.dumps`` defaults to ``ensure_ascii=True``, "café"
was serialized as the all-ASCII escape ``caf\\u00e9`` before it
ever reached ``write_text`` — the bytes on disk were identical
regardless of the encoding kwarg. The drift Copilot's review
flagged wasn't actually being caught.
Fix: directly observe the ``encoding`` argument passed to every
``write_text`` call made against the cache directory. This is
the production code's encoding choice, which is exactly what
the regression guard cares about.
"""
from unittest.mock import patch, MagicMock
from pathlib import Path as _PathCls
catalog = PresetCatalog(project_dir)
payload = {
"schema_version": "1.0",
"presets": {"foo": {"name": "Foo", "version": "1.0.0"}},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(payload).encode("utf-8")
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
# Record every ``write_text`` call's encoding kwarg so the
# assertion observes the production writer's argument directly.
recorded: list[dict] = []
real_write_text = _PathCls.write_text
def recording_write_text(self, data, *args, **kwargs):
recorded.append(
{"path": str(self), "encoding": kwargs.get("encoding")}
)
return real_write_text(self, data, *args, **kwargs)
monkeypatch.setattr(_PathCls, "write_text", recording_write_text)
with patch.object(catalog, "_open_url", return_value=mock_response):
catalog.fetch_catalog(force_refresh=True)
cache_writes = [
r for r in recorded if str(catalog.cache_dir) in r["path"]
]
assert cache_writes, "fetch_catalog made no writes to the cache dir"
for record in cache_writes:
assert record["encoding"] == "utf-8", (
f"write_text on {record['path']} used encoding "
f"{record['encoding']!r}; expected 'utf-8'"
)
def test_fetch_catalog_survives_unwritable_cache(self, project_dir, monkeypatch):
"""An unwritable cache dir doesn't fail a successful fetch.
Cache writes are best-effort, mirroring the read side and the
``integrations/catalog.py`` precedent: if ``mkdir``/``write_text``
raises ``OSError`` (read-only checkout, permissions), the
already-fetched-and-validated payload must still be returned —
not swallowed into the broad except and re-raised as a
``PresetError``.
"""
from unittest.mock import patch, MagicMock
from pathlib import Path as _PathCls
catalog = PresetCatalog(project_dir)
valid = {
"schema_version": "1.0",
"presets": {"foo": {"name": "Foo", "version": "1.0.0"}},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(valid).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
# Simulate an unwritable cache dir: every write_text under the
# cache directory raises PermissionError (an OSError subclass).
real_write_text = _PathCls.write_text
def failing_write_text(self, data, *args, **kwargs):
if str(catalog.cache_dir) in str(self):
raise PermissionError("cache dir is read-only")
return real_write_text(self, data, *args, **kwargs)
monkeypatch.setattr(_PathCls, "write_text", failing_write_text)
with patch.object(catalog, "_open_url", return_value=mock_response):
# Legacy single-catalog path.
assert catalog.fetch_catalog(force_refresh=True) == valid
# Multi-catalog path.
entry = PresetCatalogEntry(
url=catalog.DEFAULT_CATALOG_URL,
name="default",
priority=1,
install_allowed=True,
)
assert (
catalog._fetch_single_catalog(entry, force_refresh=True) == valid
)
def test_get_merged_packs_skips_non_mapping_entries(self, project_dir):
"""Per-entry guard: one malformed entry shouldn't poison the merge.
``_fetch_single_catalog`` validates that ``presets`` is a mapping,
but it doesn't (and shouldn't) validate every entry inside it — a
single bad entry in an otherwise-valid catalog should be skipped,
not crash the whole resolve path. Mirrors the per-entry skip in
``integrations/catalog.py``: a malformed entry returns no error,
valid entries continue to merge normally.
"""
from unittest.mock import patch, MagicMock
catalog = PresetCatalog(project_dir)
payload = {
"schema_version": "1.0",
"presets": {
"good": {"name": "Good", "version": "1.0.0"},
"bad-list": [],
"bad-str": "oops",
},
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(payload).encode()
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
entry = PresetCatalogEntry(
url="https://example.com/catalog.json",
name="default",
priority=1,
install_allowed=True,
)
with patch.object(catalog, "_open_url", return_value=mock_response), \
patch.object(catalog, "get_active_catalogs", return_value=[entry]):
merged = catalog._get_merged_packs(force_refresh=True)
# Only the well-formed entry survives; the two malformed entries are
# silently dropped rather than raising or crashing.
assert list(merged.keys()) == ["good"]
def test_download_pack_sends_auth_header(self, project_dir, monkeypatch):
"""download_pack passes Authorization header when configured."""
from unittest.mock import patch, MagicMock