"""System utilities: subprocess, tool detection, file operations.""" from __future__ import annotations import json import json5 import os import shutil import stat import subprocess import tempfile import yaml from pathlib import Path, PurePosixPath, PureWindowsPath from typing import Any from ._console import console CLAUDE_LOCAL_PATH = Path.home() / ".claude" / "local" / "claude" CLAUDE_NPM_LOCAL_PATH = Path.home() / ".claude" / "local" / "node_modules" / ".bin" / "claude" def relative_extension_path_violation(value: Any) -> str | None: """Return why ``value`` is unsafe as an extension-relative ``file`` path. Single source of truth for the path-safety policy shared by ``ExtensionManifest._validate()`` (manifest-load validation) and ``CommandRegistrar.register_commands()`` (runtime guard), so the two cannot drift. Returns a human-readable reason string when ``value`` is unsafe, or ``None`` when it is an acceptable relative path within the extension directory. Policy: the value must be a non-empty string with no leading/trailing whitespace, no absolute/anchored form, and no ``..`` traversal. The value is evaluated under both POSIX and Windows path semantics because a native ``Path`` is OS-dependent (a ``PurePosixPath`` on POSIX does not interpret Windows drive/UNC forms, and ``C:foo`` is anchored but not ``is_absolute()`` yet resolves against the CWD on its drive). Rejecting any non-empty anchor covers POSIX-absolute (``/abs``), Windows drive-relative (``C:foo``), Windows absolute (``C:\\foo``), and UNC/rooted forms. """ if not isinstance(value, str) or not value: return "must be a non-empty string" if value.strip() != value: return "must not have leading or trailing whitespace" posix_path = PurePosixPath(value) win_path = PureWindowsPath(value) if ( posix_path.anchor or win_path.anchor or ".." in posix_path.parts or ".." in win_path.parts ): return ( "must be a relative path within the extension directory " "(no absolute paths, drive letters, or '..' segments)" ) return None def dump_frontmatter(data: dict[str, Any]) -> str: """Serialize skill/command frontmatter to a YAML string. Centralizes the dump options used for SKILL.md frontmatter: ``allow_unicode`` preserves Unicode descriptions and ``sort_keys=False`` keeps key order, so no call site can silently drop either. """ return yaml.safe_dump(data, sort_keys=False, allow_unicode=True).strip() def run_command( cmd: list[str], check_return: bool = True, capture: bool = False, shell: bool = False, ) -> str | None: """Run a command without invoking a shell and optionally capture output. The ``shell`` parameter is kept in the signature so existing keyword callers (and the re-export from ``specify_cli``) don't raise ``TypeError``, but only the default ``shell=False`` is honoured. ``shell=True`` is rejected with ``ValueError`` rather than silently ignored, so the unsupported mode fails loudly instead of running with a different meaning. """ if shell: raise ValueError( "run_command() does not support shell=True; pass argv as a list" ) try: if capture: result = subprocess.run(cmd, check=check_return, capture_output=True, text=True) return result.stdout.strip() else: subprocess.run(cmd, check=check_return) return None except subprocess.CalledProcessError as e: if check_return: console.print(f"[red]Error running command:[/red] {' '.join(cmd)}") console.print(f"[red]Exit code:[/red] {e.returncode}") if hasattr(e, 'stderr') and e.stderr: console.print(f"[red]Error output:[/red] {e.stderr}") raise return None def check_tool(tool: str, tracker=None) -> bool: """Check if a tool is installed. Optionally update tracker. Args: tool: Name of the tool to check tracker: StepTracker | None to update with results Returns: True if tool is found, False otherwise """ # Special handling for Claude CLI local installs # See: https://github.com/github/spec-kit/issues/123 # See: https://github.com/github/spec-kit/issues/550 # Claude Code can be installed in two local paths: # 1. ~/.claude/local/claude (after `claude migrate-installer`) # 2. ~/.claude/local/node_modules/.bin/claude (npm-local install, e.g. via nvm) # Neither path may be on the system PATH, so we check them explicitly. if tool == "claude": if CLAUDE_LOCAL_PATH.is_file() or CLAUDE_NPM_LOCAL_PATH.is_file(): if tracker: tracker.complete(tool, "available") return True # Per-integration executable resolution. if tool == "kiro-cli": # Kiro currently supports both executable names. Prefer kiro-cli and # accept kiro as a compatibility fallback. found = shutil.which("kiro-cli") is not None or shutil.which("kiro") is not None elif tool == "rovodev": found = shutil.which("acli") is not None else: found = shutil.which(tool) is not None if tracker: if found: tracker.complete(tool, "available") else: tracker.error(tool, "not found") return found def handle_vscode_settings(sub_item, dest_file, rel_path, verbose=False, tracker=None) -> None: """Handle merging or copying of .vscode/settings.json files. Note: when merge produces changes, rewritten output is normalized JSON and existing JSONC comments/trailing commas are not preserved. """ def log(message, color="green"): if verbose and not tracker: console.print(f"[{color}]{message}[/] {rel_path}") def atomic_write_json(target_file: Path, payload: dict[str, Any]) -> None: """Atomically write JSON while preserving existing mode bits when possible.""" temp_path: Path | None = None try: with tempfile.NamedTemporaryFile( mode='w', encoding='utf-8', dir=target_file.parent, prefix=f"{target_file.name}.", suffix=".tmp", delete=False, ) as f: temp_path = Path(f.name) json.dump(payload, f, indent=4) f.write('\n') if target_file.exists(): try: existing_stat = target_file.stat() os.chmod(temp_path, stat.S_IMODE(existing_stat.st_mode)) if hasattr(os, "chown"): try: os.chown(temp_path, existing_stat.st_uid, existing_stat.st_gid) except PermissionError: # Best-effort owner/group preservation without requiring elevated privileges. pass except OSError: # Best-effort metadata preservation; data safety is prioritized. pass os.replace(temp_path, target_file) except Exception: if temp_path and temp_path.exists(): temp_path.unlink() raise try: with open(sub_item, 'r', encoding='utf-8') as f: # json5 natively supports comments and trailing commas (JSONC) new_settings = json5.load(f) if dest_file.exists(): merged = merge_json_files(dest_file, new_settings, verbose=verbose and not tracker) if merged is not None: atomic_write_json(dest_file, merged) log("Merged:", "green") log("Note: comments/trailing commas are normalized when rewritten", "yellow") else: log("Skipped merge (preserved existing settings)", "yellow") else: shutil.copy2(sub_item, dest_file) log("Copied (no existing settings.json):", "blue") except Exception as e: log(f"Warning: Could not merge settings: {e}", "yellow") if not dest_file.exists(): shutil.copy2(sub_item, dest_file) def merge_json_files(existing_path: Path, new_content: Any, verbose: bool = False) -> dict[str, Any] | None: """Merge new JSON content into existing JSON file. Performs a polite deep merge where: - New keys are added - Existing keys are preserved (not overwritten) unless both values are dictionaries - Nested dictionaries are merged recursively only when both sides are dictionaries - Lists and other values are preserved from base if they exist Args: existing_path: Path to existing JSON file new_content: New JSON content to merge in verbose: Whether to print merge details Returns: Merged JSON content as dict, or None if the existing file should be left untouched. """ # Load existing content first to have a safe fallback existing_content = None exists = existing_path.exists() if exists: try: with open(existing_path, 'r', encoding='utf-8') as f: # Handle comments (JSONC) natively with json5 # Note: json5 handles BOM automatically existing_content = json5.load(f) except FileNotFoundError: # Handle race condition where file is deleted after exists() check exists = False except Exception as e: if verbose: console.print(f"[yellow]Warning: Could not read or parse existing JSON in {existing_path.name} ({e}).[/yellow]") # Skip merge to preserve existing file if unparseable or inaccessible (e.g. PermissionError) return None # Validate template content if not isinstance(new_content, dict): if verbose: console.print(f"[yellow]Warning: Template content for {existing_path.name} is not a dictionary. Preserving existing settings.[/yellow]") return None if not exists: return new_content # If existing content parsed but is not a dict, skip merge to avoid data loss if not isinstance(existing_content, dict): if verbose: console.print(f"[yellow]Warning: Existing JSON in {existing_path.name} is not an object. Skipping merge to avoid data loss.[/yellow]") return None def deep_merge_polite(base: dict[str, Any], update: dict[str, Any]) -> dict[str, Any]: """Recursively merge update dict into base dict, preserving base values.""" result = base.copy() for key, value in update.items(): if key not in result: # Add new key result[key] = value elif isinstance(result[key], dict) and isinstance(value, dict): # Recursively merge nested dictionaries result[key] = deep_merge_polite(result[key], value) else: # Key already exists and values are not both dicts; preserve existing value. # This ensures user settings aren't overwritten by template defaults. pass return result merged = deep_merge_polite(existing_content, new_content) # Detect if anything actually changed. If not, return None so the caller # can skip rewriting the file (preserving user's comments/formatting). if merged == existing_content: return None if verbose: console.print(f"[cyan]Merged JSON file:[/cyan] {existing_path.name}") return merged def _display_project_path(project_root: Path, path: str | Path) -> str: """Return a stable POSIX-style display path for paths under a project.""" path_obj = Path(path) try: rel_path = path_obj.relative_to(project_root) if path_obj.is_absolute() else path_obj except ValueError: try: rel_path = path_obj.resolve().relative_to(project_root.resolve()) except (OSError, ValueError): return path_obj.as_posix() return rel_path.as_posix() def version_satisfies(current: str, required: str) -> bool: """Check if current version satisfies required version specifier. Evaluates the version against the specifier using the project's prerelease policy (prereleases are allowed). Args: current: Current version (e.g., "0.1.5") required: Required version specifier (e.g., ">=0.1.0,<2.0.0") Returns: True if version satisfies requirement """ from packaging import version as pkg_version from packaging.specifiers import InvalidSpecifier, SpecifierSet try: current_ver = pkg_version.Version(current) specifier = SpecifierSet(required) return specifier.contains(current_ver, prereleases=True) except (pkg_version.InvalidVersion, InvalidSpecifier): return False