mirror of
https://github.com/github/spec-kit.git
synced 2026-07-03 12:28:06 +08:00
Support controlled multi-install for safe AI agent integrations (#2389)
* support controlled multi-install integrations * fix: harden multi-install integration state * refactor: isolate integration runtime helpers * fix: address copilot review feedback * fix: address follow-up copilot feedback * fix: tighten integration switch semantics * fix: address final copilot review feedback * fix: harden integration manifest read errors * fix: refuse symlinked shared infra paths * test: filter expected self-test preset warning * test: address copilot review nits * refactor: centralize safe shared infra writes * fix: use no-follow writes for shared infra * fix: keep default integration atomic on template refresh * fix: harden shared infra error paths * fix: preflight shared infra and future state schemas * fix: support nested shared scripts during preflight * test: tolerate wrapped schema error output * fix: use safe default mode for shared text writes * fix: use posix paths in shared skip output * fix: share project guard for integration use * fix: centralize spec-kit project guards * fix: use posix project paths in cli output * fix: harden shared manifest and upgrade refresh
This commit is contained in:
File diff suppressed because it is too large
Load Diff
90
src/specify_cli/integration_runtime.py
Normal file
90
src/specify_cli/integration_runtime.py
Normal file
@@ -0,0 +1,90 @@
|
||||
"""Runtime helpers for integration commands."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from .integration_state import integration_setting, integration_settings
|
||||
|
||||
|
||||
ParseOptions = Callable[[Any, str], dict[str, Any] | None]
|
||||
|
||||
|
||||
def resolve_integration_options(
|
||||
integration: Any,
|
||||
state: dict[str, Any],
|
||||
key: str,
|
||||
raw_options: str | None,
|
||||
*,
|
||||
parse_options: ParseOptions,
|
||||
) -> tuple[str | None, dict[str, Any] | None]:
|
||||
"""Resolve raw and parsed options for an integration operation."""
|
||||
if raw_options is not None:
|
||||
return raw_options, parse_options(integration, raw_options)
|
||||
|
||||
setting = integration_setting(state, key)
|
||||
stored_raw = setting.get("raw_options")
|
||||
if not isinstance(stored_raw, str):
|
||||
stored_raw = None
|
||||
|
||||
stored_parsed = setting.get("parsed_options")
|
||||
if isinstance(stored_parsed, dict):
|
||||
return stored_raw, stored_parsed or None
|
||||
|
||||
if stored_raw:
|
||||
return stored_raw, parse_options(integration, stored_raw)
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
def with_integration_setting(
|
||||
state: dict[str, Any],
|
||||
key: str,
|
||||
integration: Any,
|
||||
*,
|
||||
script_type: str | None = None,
|
||||
raw_options: str | None = None,
|
||||
parsed_options: dict[str, Any] | None = None,
|
||||
) -> dict[str, dict[str, Any]]:
|
||||
"""Return integration settings with *key* updated."""
|
||||
settings = integration_settings(state)
|
||||
current = dict(settings.get(key, {}))
|
||||
|
||||
if script_type:
|
||||
current["script"] = script_type
|
||||
if raw_options is not None:
|
||||
current["raw_options"] = raw_options
|
||||
elif "raw_options" in current and not current.get("raw_options"):
|
||||
current.pop("raw_options", None)
|
||||
|
||||
if parsed_options is not None:
|
||||
current["parsed_options"] = parsed_options
|
||||
elif raw_options is not None:
|
||||
current.pop("parsed_options", None)
|
||||
|
||||
current["invoke_separator"] = integration.effective_invoke_separator(parsed_options)
|
||||
settings[key] = current
|
||||
return settings
|
||||
|
||||
|
||||
def invoke_separator_for_integration(
|
||||
integration: Any,
|
||||
state: dict[str, Any],
|
||||
key: str,
|
||||
parsed_options: dict[str, Any] | None = None,
|
||||
) -> str:
|
||||
"""Resolve the invocation separator for stored/default integration state."""
|
||||
if parsed_options is not None:
|
||||
return integration.effective_invoke_separator(parsed_options)
|
||||
|
||||
setting = integration_setting(state, key)
|
||||
stored_separator = setting.get("invoke_separator")
|
||||
if isinstance(stored_separator, str) and stored_separator:
|
||||
return stored_separator
|
||||
|
||||
stored_parsed = setting.get("parsed_options")
|
||||
if isinstance(stored_parsed, dict):
|
||||
return integration.effective_invoke_separator(stored_parsed)
|
||||
|
||||
return integration.effective_invoke_separator(None)
|
||||
161
src/specify_cli/integration_state.py
Normal file
161
src/specify_cli/integration_state.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""State helpers for installed AI agent integrations."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
INTEGRATION_JSON = ".specify/integration.json"
|
||||
INTEGRATION_STATE_SCHEMA = 1
|
||||
|
||||
|
||||
def clean_integration_key(key: Any) -> str | None:
|
||||
"""Return a stripped integration key, or None for empty/non-string values."""
|
||||
if not isinstance(key, str) or not key.strip():
|
||||
return None
|
||||
return key.strip()
|
||||
|
||||
|
||||
def dedupe_integration_keys(keys: list[Any]) -> list[str]:
|
||||
"""Return a de-duplicated list of non-empty integration keys."""
|
||||
seen: set[str] = set()
|
||||
deduped: list[str] = []
|
||||
for key in keys:
|
||||
clean = clean_integration_key(key)
|
||||
if clean is None:
|
||||
continue
|
||||
if clean in seen:
|
||||
continue
|
||||
seen.add(clean)
|
||||
deduped.append(clean)
|
||||
return deduped
|
||||
|
||||
|
||||
def normalize_integration_settings(settings: Any) -> dict[str, dict[str, Any]]:
|
||||
"""Return JSON-safe per-integration runtime settings."""
|
||||
if not isinstance(settings, dict):
|
||||
return {}
|
||||
|
||||
normalized: dict[str, dict[str, Any]] = {}
|
||||
for key, value in settings.items():
|
||||
if not isinstance(key, str) or not key.strip() or not isinstance(value, dict):
|
||||
continue
|
||||
|
||||
clean: dict[str, Any] = {}
|
||||
script = value.get("script")
|
||||
if isinstance(script, str) and script.strip():
|
||||
clean["script"] = script.strip()
|
||||
|
||||
raw_options = value.get("raw_options")
|
||||
if isinstance(raw_options, str):
|
||||
clean["raw_options"] = raw_options
|
||||
|
||||
parsed_options = value.get("parsed_options")
|
||||
if isinstance(parsed_options, dict):
|
||||
clean["parsed_options"] = parsed_options
|
||||
|
||||
invoke_separator = value.get("invoke_separator")
|
||||
if isinstance(invoke_separator, str) and invoke_separator.strip():
|
||||
clean["invoke_separator"] = invoke_separator.strip()
|
||||
|
||||
if clean:
|
||||
normalized[key.strip()] = clean
|
||||
|
||||
return normalized
|
||||
|
||||
|
||||
def _normalized_integration_state_schema(value: Any) -> int:
|
||||
if isinstance(value, int) and not isinstance(value, bool) and value > INTEGRATION_STATE_SCHEMA:
|
||||
return value
|
||||
return INTEGRATION_STATE_SCHEMA
|
||||
|
||||
|
||||
def normalize_integration_state(data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Normalize legacy and multi-install integration metadata."""
|
||||
legacy_key = clean_integration_key(data.get("integration"))
|
||||
default_key = clean_integration_key(data.get("default_integration")) or legacy_key
|
||||
|
||||
installed = data.get("installed_integrations")
|
||||
installed_keys = dedupe_integration_keys(installed if isinstance(installed, list) else [])
|
||||
if not default_key and installed_keys:
|
||||
default_key = installed_keys[0]
|
||||
if default_key and default_key not in installed_keys:
|
||||
installed_keys.insert(0, default_key)
|
||||
|
||||
settings = normalize_integration_settings(data.get("integration_settings"))
|
||||
|
||||
normalized = dict(data)
|
||||
normalized["integration_state_schema"] = _normalized_integration_state_schema(
|
||||
data.get("integration_state_schema")
|
||||
)
|
||||
if default_key:
|
||||
normalized["integration"] = default_key
|
||||
normalized["default_integration"] = default_key
|
||||
else:
|
||||
normalized.pop("integration", None)
|
||||
normalized.pop("default_integration", None)
|
||||
normalized["installed_integrations"] = installed_keys
|
||||
normalized["integration_settings"] = {
|
||||
key: settings[key] for key in installed_keys if key in settings
|
||||
}
|
||||
return normalized
|
||||
|
||||
|
||||
def default_integration_key(state: dict[str, Any]) -> str | None:
|
||||
"""Return the default integration key from normalized state."""
|
||||
key = state.get("default_integration") or state.get("integration")
|
||||
return clean_integration_key(key)
|
||||
|
||||
|
||||
def installed_integration_keys(state: dict[str, Any]) -> list[str]:
|
||||
"""Return installed integration keys from normalized state."""
|
||||
return dedupe_integration_keys(state.get("installed_integrations", []))
|
||||
|
||||
|
||||
def integration_settings(state: dict[str, Any]) -> dict[str, dict[str, Any]]:
|
||||
"""Return normalized per-integration settings from state."""
|
||||
return normalize_integration_settings(state.get("integration_settings"))
|
||||
|
||||
|
||||
def integration_setting(state: dict[str, Any], key: str) -> dict[str, Any]:
|
||||
"""Return stored runtime settings for *key*."""
|
||||
return dict(integration_settings(state).get(key, {}))
|
||||
|
||||
|
||||
def write_integration_json(
|
||||
project_root: Path,
|
||||
*,
|
||||
version: str,
|
||||
integration_key: str | None,
|
||||
installed_integrations: list[str] | None = None,
|
||||
settings: dict[str, dict[str, Any]] | None = None,
|
||||
) -> None:
|
||||
"""Write ``.specify/integration.json`` with legacy-compatible state."""
|
||||
dest = project_root / INTEGRATION_JSON
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
integration_key = clean_integration_key(integration_key)
|
||||
installed = dedupe_integration_keys(installed_integrations or [])
|
||||
if integration_key and integration_key not in installed:
|
||||
installed.insert(0, integration_key)
|
||||
if not integration_key and installed:
|
||||
integration_key = installed[0]
|
||||
|
||||
normalized_settings = normalize_integration_settings(settings or {})
|
||||
normalized_settings = {
|
||||
key: normalized_settings[key] for key in installed if key in normalized_settings
|
||||
}
|
||||
|
||||
data: dict[str, Any] = {
|
||||
"version": version,
|
||||
"integration_state_schema": INTEGRATION_STATE_SCHEMA,
|
||||
"installed_integrations": installed,
|
||||
"integration_settings": normalized_settings,
|
||||
}
|
||||
if integration_key:
|
||||
data["integration"] = integration_key
|
||||
data["default_integration"] = integration_key
|
||||
|
||||
dest.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
|
||||
@@ -19,3 +19,4 @@ class AuggieIntegration(MarkdownIntegration):
|
||||
"extension": ".md",
|
||||
}
|
||||
context_file = ".augment/rules/specify-rules.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -87,6 +87,14 @@ class IntegrationBase(ABC):
|
||||
invoke_separator: str = "."
|
||||
"""Separator used in slash-command invocations (``"."`` → ``/speckit.plan``)."""
|
||||
|
||||
multi_install_safe: bool = False
|
||||
"""Whether this integration is declared safe to install alongside others.
|
||||
|
||||
Safe integrations must use a static, unique agent root, command directory,
|
||||
and context file. Registry tests enforce those invariants for every
|
||||
integration that sets this flag.
|
||||
"""
|
||||
|
||||
# -- Markers for managed context section ------------------------------
|
||||
|
||||
CONTEXT_MARKER_START = "<!-- SPECKIT START -->"
|
||||
|
||||
@@ -53,6 +53,7 @@ class ClaudeIntegration(SkillsIntegration):
|
||||
"extension": "/SKILL.md",
|
||||
}
|
||||
context_file = "CLAUDE.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@staticmethod
|
||||
def inject_argument_hint(content: str, hint: str) -> str:
|
||||
|
||||
@@ -19,3 +19,4 @@ class CodebuddyIntegration(MarkdownIntegration):
|
||||
"extension": ".md",
|
||||
}
|
||||
context_file = "CODEBUDDY.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -27,6 +27,7 @@ class CodexIntegration(SkillsIntegration):
|
||||
"extension": "/SKILL.md",
|
||||
}
|
||||
context_file = "AGENTS.md"
|
||||
multi_install_safe = True
|
||||
|
||||
def build_exec_args(
|
||||
self,
|
||||
|
||||
@@ -26,6 +26,7 @@ class CursorAgentIntegration(SkillsIntegration):
|
||||
}
|
||||
|
||||
context_file = ".cursor/rules/specify-rules.mdc"
|
||||
multi_install_safe = True
|
||||
|
||||
@classmethod
|
||||
def options(cls) -> list[IntegrationOption]:
|
||||
|
||||
@@ -19,3 +19,4 @@ class GeminiIntegration(TomlIntegration):
|
||||
"extension": ".toml",
|
||||
}
|
||||
context_file = "GEMINI.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -19,3 +19,4 @@ class IflowIntegration(MarkdownIntegration):
|
||||
"extension": ".md",
|
||||
}
|
||||
context_file = "IFLOW.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -19,3 +19,4 @@ class JunieIntegration(MarkdownIntegration):
|
||||
"extension": ".md",
|
||||
}
|
||||
context_file = ".junie/AGENTS.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -19,3 +19,4 @@ class KilocodeIntegration(MarkdownIntegration):
|
||||
"extension": ".md",
|
||||
}
|
||||
context_file = ".kilocode/rules/specify-rules.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -36,6 +36,7 @@ class KimiIntegration(SkillsIntegration):
|
||||
"extension": "/SKILL.md",
|
||||
}
|
||||
context_file = "KIMI.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@classmethod
|
||||
def options(cls) -> list[IntegrationOption]:
|
||||
|
||||
@@ -11,6 +11,7 @@ from __future__ import annotations
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
@@ -47,6 +48,59 @@ def _validate_rel_path(rel: Path, root: Path) -> Path:
|
||||
return resolved
|
||||
|
||||
|
||||
def _manifest_path_label(root: Path, path: Path) -> str:
|
||||
try:
|
||||
return path.relative_to(root).as_posix()
|
||||
except ValueError:
|
||||
return path.as_posix()
|
||||
|
||||
|
||||
def _ensure_safe_manifest_directory(root: Path, directory: Path) -> None:
|
||||
"""Create a manifest directory without following symlinked parents."""
|
||||
root_resolved = root.resolve()
|
||||
try:
|
||||
rel = directory.relative_to(root)
|
||||
except ValueError:
|
||||
label = _manifest_path_label(root, directory)
|
||||
raise ValueError(f"Integration manifest directory escapes project root: {label}") from None
|
||||
|
||||
current = root
|
||||
for part in rel.parts:
|
||||
current = current / part
|
||||
label = _manifest_path_label(root, current)
|
||||
if current.is_symlink():
|
||||
raise ValueError(f"Refusing to use symlinked integration manifest directory: {label}")
|
||||
if current.exists():
|
||||
if not current.is_dir():
|
||||
raise ValueError(f"Integration manifest directory path is not a directory: {label}")
|
||||
try:
|
||||
current.resolve().relative_to(root_resolved)
|
||||
except (OSError, ValueError):
|
||||
raise ValueError(f"Integration manifest directory escapes project root: {label}") from None
|
||||
continue
|
||||
current.mkdir()
|
||||
try:
|
||||
current.resolve().relative_to(root_resolved)
|
||||
except (OSError, ValueError):
|
||||
raise ValueError(f"Integration manifest directory escapes project root: {label}") from None
|
||||
|
||||
|
||||
def _ensure_safe_manifest_destination(root: Path, path: Path) -> None:
|
||||
"""Refuse manifest writes that would escape the project or follow symlinks."""
|
||||
root_resolved = root.resolve()
|
||||
_ensure_safe_manifest_directory(root, path.parent)
|
||||
label = _manifest_path_label(root, path)
|
||||
if path.is_symlink():
|
||||
raise ValueError(f"Refusing to overwrite symlinked integration manifest path: {label}")
|
||||
if path.exists():
|
||||
if not path.is_file():
|
||||
raise ValueError(f"Integration manifest path is not a file: {label}")
|
||||
try:
|
||||
path.resolve().relative_to(root_resolved)
|
||||
except (OSError, ValueError):
|
||||
raise ValueError(f"Integration manifest path escapes project root: {label}") from None
|
||||
|
||||
|
||||
class IntegrationManifest:
|
||||
"""Tracks files installed by a single integration.
|
||||
|
||||
@@ -217,8 +271,19 @@ class IntegrationManifest:
|
||||
"files": self._files,
|
||||
}
|
||||
path = self.manifest_path
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
|
||||
content = json.dumps(data, indent=2) + "\n"
|
||||
_ensure_safe_manifest_destination(self.project_root, path)
|
||||
fd, temp_name = tempfile.mkstemp(prefix=f".{path.name}.", dir=path.parent)
|
||||
temp_path = Path(temp_name)
|
||||
try:
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
||||
fh.write(content)
|
||||
temp_path.chmod(0o644)
|
||||
_ensure_safe_manifest_destination(self.project_root, path)
|
||||
os.replace(temp_path, path)
|
||||
finally:
|
||||
if temp_path.exists():
|
||||
temp_path.unlink()
|
||||
return path
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -19,3 +19,4 @@ class QodercliIntegration(MarkdownIntegration):
|
||||
"extension": ".md",
|
||||
}
|
||||
context_file = "QODER.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -19,3 +19,4 @@ class QwenIntegration(MarkdownIntegration):
|
||||
"extension": ".md",
|
||||
}
|
||||
context_file = "QWEN.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -19,3 +19,4 @@ class RooIntegration(MarkdownIntegration):
|
||||
"extension": ".md",
|
||||
}
|
||||
context_file = ".roo/rules/specify-rules.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -19,3 +19,4 @@ class ShaiIntegration(MarkdownIntegration):
|
||||
"extension": ".md",
|
||||
}
|
||||
context_file = "SHAI.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -19,3 +19,4 @@ class TabnineIntegration(TomlIntegration):
|
||||
"extension": ".toml",
|
||||
}
|
||||
context_file = "TABNINE.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@@ -27,6 +27,7 @@ class TraeIntegration(SkillsIntegration):
|
||||
"extension": "/SKILL.md",
|
||||
}
|
||||
context_file = ".trae/rules/project_rules.md"
|
||||
multi_install_safe = True
|
||||
|
||||
@classmethod
|
||||
def options(cls) -> list[IntegrationOption]:
|
||||
|
||||
@@ -19,3 +19,4 @@ class WindsurfIntegration(MarkdownIntegration):
|
||||
"extension": ".md",
|
||||
}
|
||||
context_file = ".windsurf/rules/specify-rules.md"
|
||||
multi_install_safe = True
|
||||
|
||||
317
src/specify_cli/shared_infra.py
Normal file
317
src/specify_cli/shared_infra.py
Normal file
@@ -0,0 +1,317 @@
|
||||
"""Shared Spec Kit infrastructure installation helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .integrations.base import IntegrationBase
|
||||
from .integrations.manifest import IntegrationManifest
|
||||
|
||||
|
||||
def load_speckit_manifest(
|
||||
project_path: Path,
|
||||
*,
|
||||
version: str,
|
||||
console: Any | None = None,
|
||||
) -> IntegrationManifest:
|
||||
"""Load the shared infrastructure manifest, preserving existing entries."""
|
||||
manifest_path = project_path / ".specify" / "integrations" / "speckit.manifest.json"
|
||||
if manifest_path.exists():
|
||||
try:
|
||||
manifest = IntegrationManifest.load("speckit", project_path)
|
||||
manifest.version = version
|
||||
return manifest
|
||||
except (ValueError, FileNotFoundError, OSError, UnicodeDecodeError) as exc:
|
||||
if console is not None:
|
||||
console.print(
|
||||
f"[yellow]Warning:[/yellow] Could not read shared infrastructure "
|
||||
f"manifest at {manifest_path}: {exc}"
|
||||
)
|
||||
console.print(
|
||||
"A new shared manifest will be created; previously tracked "
|
||||
"shared files may be treated as untracked."
|
||||
)
|
||||
return IntegrationManifest("speckit", project_path, version=version)
|
||||
|
||||
|
||||
def shared_templates_source(
|
||||
*,
|
||||
core_pack: Path | None,
|
||||
repo_root: Path,
|
||||
) -> Path:
|
||||
"""Return the bundled/source shared templates directory."""
|
||||
if core_pack and (core_pack / "templates").is_dir():
|
||||
return core_pack / "templates"
|
||||
return repo_root / "templates"
|
||||
|
||||
|
||||
def shared_scripts_source(
|
||||
*,
|
||||
core_pack: Path | None,
|
||||
repo_root: Path,
|
||||
) -> Path:
|
||||
"""Return the bundled/source shared scripts directory."""
|
||||
if core_pack and (core_pack / "scripts").is_dir():
|
||||
return core_pack / "scripts"
|
||||
return repo_root / "scripts"
|
||||
|
||||
|
||||
def _shared_destination_label(project_path: Path, dest: Path) -> str:
|
||||
try:
|
||||
return dest.relative_to(project_path).as_posix()
|
||||
except ValueError:
|
||||
return str(dest)
|
||||
|
||||
|
||||
def _shared_relative_path(project_path: Path, dest: Path) -> Path:
|
||||
try:
|
||||
rel = dest.relative_to(project_path)
|
||||
except ValueError:
|
||||
label = _shared_destination_label(project_path, dest)
|
||||
raise ValueError(f"Shared infrastructure path escapes project root: {label}") from None
|
||||
|
||||
if rel.is_absolute() or ".." in rel.parts:
|
||||
label = _shared_destination_label(project_path, dest)
|
||||
raise ValueError(f"Shared infrastructure path escapes project root: {label}")
|
||||
return rel
|
||||
|
||||
|
||||
def _ensure_safe_shared_directory(project_path: Path, directory: Path, *, create: bool = True) -> None:
|
||||
"""Create a shared infra directory without following symlinked parents."""
|
||||
root = project_path.resolve()
|
||||
rel = _shared_relative_path(project_path, directory)
|
||||
current = project_path
|
||||
|
||||
for part in rel.parts:
|
||||
current = current / part
|
||||
label = _shared_destination_label(project_path, current)
|
||||
if current.is_symlink():
|
||||
raise ValueError(f"Refusing to use symlinked shared infrastructure directory: {label}")
|
||||
if current.exists():
|
||||
if not current.is_dir():
|
||||
raise ValueError(f"Shared infrastructure directory path is not a directory: {label}")
|
||||
try:
|
||||
current.resolve().relative_to(root)
|
||||
except (OSError, ValueError):
|
||||
raise ValueError(f"Shared infrastructure directory escapes project root: {label}") from None
|
||||
continue
|
||||
if not create:
|
||||
raise ValueError(f"Shared infrastructure directory does not exist: {label}")
|
||||
current.mkdir()
|
||||
if current.is_symlink():
|
||||
raise ValueError(f"Refusing to use symlinked shared infrastructure directory: {label}")
|
||||
try:
|
||||
current.resolve().relative_to(root)
|
||||
except (OSError, ValueError):
|
||||
raise ValueError(f"Shared infrastructure directory escapes project root: {label}") from None
|
||||
|
||||
|
||||
def _validate_safe_shared_directory(project_path: Path, directory: Path) -> None:
|
||||
"""Validate existing directory parents while allowing missing directories."""
|
||||
root = project_path.resolve()
|
||||
rel = _shared_relative_path(project_path, directory)
|
||||
current = project_path
|
||||
|
||||
for part in rel.parts:
|
||||
current = current / part
|
||||
label = _shared_destination_label(project_path, current)
|
||||
if current.is_symlink():
|
||||
raise ValueError(f"Refusing to use symlinked shared infrastructure directory: {label}")
|
||||
if not current.exists():
|
||||
continue
|
||||
if not current.is_dir():
|
||||
raise ValueError(f"Shared infrastructure directory path is not a directory: {label}")
|
||||
try:
|
||||
current.resolve().relative_to(root)
|
||||
except (OSError, ValueError):
|
||||
raise ValueError(f"Shared infrastructure directory escapes project root: {label}") from None
|
||||
|
||||
|
||||
def _ensure_safe_shared_destination(
|
||||
project_path: Path,
|
||||
dest: Path,
|
||||
*,
|
||||
parent_must_exist: bool = True,
|
||||
) -> None:
|
||||
"""Refuse shared infra writes that would escape or follow symlinks."""
|
||||
root = project_path.resolve()
|
||||
_shared_relative_path(project_path, dest)
|
||||
if parent_must_exist:
|
||||
_ensure_safe_shared_directory(project_path, dest.parent, create=False)
|
||||
else:
|
||||
_validate_safe_shared_directory(project_path, dest.parent)
|
||||
label = _shared_destination_label(project_path, dest)
|
||||
if dest.is_symlink():
|
||||
raise ValueError(f"Refusing to overwrite symlinked shared infrastructure path: {label}")
|
||||
|
||||
if dest.exists():
|
||||
try:
|
||||
dest.resolve().relative_to(root)
|
||||
except (OSError, ValueError):
|
||||
raise ValueError(f"Shared infrastructure destination escapes project root: {label}") from None
|
||||
|
||||
|
||||
def _write_shared_text(project_path: Path, dest: Path, content: str) -> None:
|
||||
_write_shared_bytes(project_path, dest, content.encode("utf-8"))
|
||||
|
||||
|
||||
def _write_shared_bytes(
|
||||
project_path: Path,
|
||||
dest: Path,
|
||||
content: bytes,
|
||||
*,
|
||||
mode: int = 0o644,
|
||||
) -> None:
|
||||
_ensure_safe_shared_destination(project_path, dest)
|
||||
fd, temp_name = tempfile.mkstemp(prefix=f".{dest.name}.", dir=dest.parent)
|
||||
temp_path = Path(temp_name)
|
||||
try:
|
||||
with os.fdopen(fd, "wb") as fh:
|
||||
fh.write(content)
|
||||
temp_path.chmod(mode)
|
||||
_ensure_safe_shared_destination(project_path, dest)
|
||||
os.replace(temp_path, dest)
|
||||
finally:
|
||||
if temp_path.exists():
|
||||
temp_path.unlink()
|
||||
|
||||
|
||||
def refresh_shared_templates(
|
||||
project_path: Path,
|
||||
*,
|
||||
version: str,
|
||||
core_pack: Path | None,
|
||||
repo_root: Path,
|
||||
console: Any,
|
||||
invoke_separator: str,
|
||||
force: bool = False,
|
||||
) -> None:
|
||||
"""Refresh default-sensitive shared templates without touching scripts."""
|
||||
templates_src = shared_templates_source(core_pack=core_pack, repo_root=repo_root)
|
||||
if not templates_src.is_dir():
|
||||
return
|
||||
|
||||
manifest = load_speckit_manifest(project_path, version=version, console=console)
|
||||
tracked_files = manifest.files
|
||||
modified = set(manifest.check_modified())
|
||||
skipped_files: list[str] = []
|
||||
planned_updates: list[tuple[Path, str, str]] = []
|
||||
|
||||
dest_templates = project_path / ".specify" / "templates"
|
||||
_ensure_safe_shared_directory(project_path, dest_templates)
|
||||
for src in templates_src.iterdir():
|
||||
if not src.is_file() or src.name == "vscode-settings.json" or src.name.startswith("."):
|
||||
continue
|
||||
|
||||
dst = dest_templates / src.name
|
||||
_ensure_safe_shared_destination(project_path, dst)
|
||||
rel = dst.relative_to(project_path).as_posix()
|
||||
if dst.exists() and not force:
|
||||
if rel not in tracked_files or rel in modified:
|
||||
skipped_files.append(rel)
|
||||
continue
|
||||
|
||||
content = src.read_text(encoding="utf-8")
|
||||
content = IntegrationBase.resolve_command_refs(content, invoke_separator)
|
||||
planned_updates.append((dst, rel, content))
|
||||
|
||||
for dst, rel, content in planned_updates:
|
||||
_write_shared_text(project_path, dst, content)
|
||||
manifest.record_existing(rel)
|
||||
|
||||
manifest.save()
|
||||
|
||||
if skipped_files:
|
||||
console.print(
|
||||
f"[yellow]⚠[/yellow] {len(skipped_files)} modified or untracked shared template file(s) were not updated:"
|
||||
)
|
||||
for rel in skipped_files:
|
||||
console.print(f" {rel}")
|
||||
|
||||
|
||||
def install_shared_infra(
|
||||
project_path: Path,
|
||||
script_type: str,
|
||||
*,
|
||||
version: str,
|
||||
core_pack: Path | None,
|
||||
repo_root: Path,
|
||||
console: Any,
|
||||
force: bool = False,
|
||||
invoke_separator: str = ".",
|
||||
) -> bool:
|
||||
"""Install shared scripts and templates into *project_path*."""
|
||||
manifest = load_speckit_manifest(project_path, version=version, console=console)
|
||||
skipped_files: list[str] = []
|
||||
planned_copies: list[tuple[Path, str, bytes, int]] = []
|
||||
planned_templates: list[tuple[Path, str, str]] = []
|
||||
|
||||
scripts_src = shared_scripts_source(core_pack=core_pack, repo_root=repo_root)
|
||||
if scripts_src.is_dir():
|
||||
dest_scripts = project_path / ".specify" / "scripts"
|
||||
_ensure_safe_shared_directory(project_path, dest_scripts)
|
||||
variant_dir = "bash" if script_type == "sh" else "powershell"
|
||||
variant_src = scripts_src / variant_dir
|
||||
if variant_src.is_dir():
|
||||
dest_variant = dest_scripts / variant_dir
|
||||
_ensure_safe_shared_directory(project_path, dest_variant)
|
||||
for src_path in variant_src.rglob("*"):
|
||||
if not src_path.is_file():
|
||||
continue
|
||||
|
||||
rel_path = src_path.relative_to(variant_src)
|
||||
dst_path = dest_variant / rel_path
|
||||
_ensure_safe_shared_destination(project_path, dst_path, parent_must_exist=False)
|
||||
if dst_path.exists() and not force:
|
||||
skipped_files.append(dst_path.relative_to(project_path).as_posix())
|
||||
continue
|
||||
|
||||
_ensure_safe_shared_directory(project_path, dst_path.parent)
|
||||
rel = dst_path.relative_to(project_path).as_posix()
|
||||
planned_copies.append((dst_path, rel, src_path.read_bytes(), src_path.stat().st_mode & 0o777))
|
||||
|
||||
templates_src = shared_templates_source(core_pack=core_pack, repo_root=repo_root)
|
||||
if templates_src.is_dir():
|
||||
dest_templates = project_path / ".specify" / "templates"
|
||||
_ensure_safe_shared_directory(project_path, dest_templates)
|
||||
for src in templates_src.iterdir():
|
||||
if not src.is_file() or src.name == "vscode-settings.json" or src.name.startswith("."):
|
||||
continue
|
||||
|
||||
dst = dest_templates / src.name
|
||||
_ensure_safe_shared_destination(project_path, dst)
|
||||
if dst.exists() and not force:
|
||||
skipped_files.append(dst.relative_to(project_path).as_posix())
|
||||
continue
|
||||
|
||||
content = src.read_text(encoding="utf-8")
|
||||
content = IntegrationBase.resolve_command_refs(content, invoke_separator)
|
||||
rel = dst.relative_to(project_path).as_posix()
|
||||
planned_templates.append((dst, rel, content))
|
||||
|
||||
for dst_path, rel, content, mode in planned_copies:
|
||||
_ensure_safe_shared_directory(project_path, dst_path.parent)
|
||||
_write_shared_bytes(project_path, dst_path, content, mode=mode)
|
||||
manifest.record_existing(rel)
|
||||
|
||||
for dst, rel, content in planned_templates:
|
||||
_write_shared_text(project_path, dst, content)
|
||||
manifest.record_existing(rel)
|
||||
|
||||
if skipped_files:
|
||||
console.print(
|
||||
f"[yellow]⚠[/yellow] {len(skipped_files)} shared infrastructure file(s) already exist and were not updated:"
|
||||
)
|
||||
for path in skipped_files:
|
||||
console.print(f" {path}")
|
||||
console.print(
|
||||
"To refresh shared infrastructure, run "
|
||||
"[cyan]specify init --here --force[/cyan] or "
|
||||
"[cyan]specify integration upgrade --force[/cyan]."
|
||||
)
|
||||
|
||||
manifest.save()
|
||||
return True
|
||||
Reference in New Issue
Block a user