refactor: extract _assets.py and _utils.py from __init__.py (PR-2/8) (#2543)

* refactor: extract _assets.py and _utils.py from __init__.py

Move bundle path resolution and version lookup into _assets.py (stdlib only,
zero internal imports), and system utilities (subprocess, tool detection,
file operations) into _utils.py (imports only from ._console). Re-export all
moved symbols from __init__.py for backward compatibility. Update
test_check_tool.py to patch both specify_cli and specify_cli._utils namespaces
since constants are now defined in _utils.

* style: apply PR-1 review patterns to _assets.py and _utils.py

- Add module docstring to _assets.py (stdlib-only, zero internal imports)
- Add blank line after `from __future__ import annotations` in both files
- Replace `Optional[X]` with `X | None` throughout _utils.py (PEP 604)
- Remove unused `Optional` import from _utils.py
- Use explicit re-export form (`X as X`) for public symbols in __init__.py
- Remove unused `subprocess` and `tempfile` imports from __init__.py (moved to _utils.py)
This commit is contained in:
darion-yaphet
2026-05-14 00:20:36 +08:00
committed by GitHub
parent 9732a4d092
commit 2fb9d3bb4b
5 changed files with 453 additions and 386 deletions

View File

@@ -27,14 +27,10 @@ Or install globally:
"""
import os
import subprocess
import sys
import zipfile
import tempfile
import shutil
import json
import json5
import stat
import shlex
import urllib.error
import urllib.request
@@ -80,6 +76,25 @@ from ._console import (
select_with_arrows,
show_banner,
)
from ._assets import (
_locate_bundled_extension,
_locate_bundled_preset,
_locate_bundled_workflow,
_locate_core_pack,
_repo_root,
get_speckit_version as get_speckit_version,
)
from ._utils import (
CLAUDE_LOCAL_PATH as CLAUDE_LOCAL_PATH,
CLAUDE_NPM_LOCAL_PATH as CLAUDE_NPM_LOCAL_PATH,
_display_project_path,
check_tool as check_tool,
handle_vscode_settings as handle_vscode_settings,
init_git_repo as init_git_repo,
is_git_repo as is_git_repo,
merge_json_files as merge_json_files,
run_command as run_command,
)
GITHUB_API_LATEST = "https://api.github.com/repos/github/spec-kit/releases/latest"
@@ -161,9 +176,6 @@ def _stdin_is_interactive() -> bool:
SCRIPT_TYPE_CHOICES = {"sh": "POSIX Shell (bash/zsh)", "ps": "PowerShell"}
CLAUDE_LOCAL_PATH = Path.home() / ".claude" / "local" / "claude"
CLAUDE_NPM_LOCAL_PATH = Path.home() / ".claude" / "local" / "node_modules" / ".bin" / "claude"
app = typer.Typer(
name="specify",
help="Setup tool for Specify spec-driven development projects",
@@ -188,351 +200,6 @@ def callback(
console.print(Align.center("[dim]Run 'specify --help' for usage information[/dim]"))
console.print()
def run_command(cmd: list[str], check_return: bool = True, capture: bool = False, shell: bool = False) -> Optional[str]:
"""Run a shell command and optionally capture output."""
try:
if capture:
result = subprocess.run(cmd, check=check_return, capture_output=True, text=True, shell=shell)
return result.stdout.strip()
else:
subprocess.run(cmd, check=check_return, shell=shell)
return None
except subprocess.CalledProcessError as e:
if check_return:
console.print(f"[red]Error running command:[/red] {' '.join(cmd)}")
console.print(f"[red]Exit code:[/red] {e.returncode}")
if hasattr(e, 'stderr') and e.stderr:
console.print(f"[red]Error output:[/red] {e.stderr}")
raise
return None
def check_tool(tool: str, tracker: StepTracker = None) -> bool:
"""Check if a tool is installed. Optionally update tracker.
Args:
tool: Name of the tool to check
tracker: Optional StepTracker to update with results
Returns:
True if tool is found, False otherwise
"""
# Special handling for Claude CLI local installs
# See: https://github.com/github/spec-kit/issues/123
# See: https://github.com/github/spec-kit/issues/550
# Claude Code can be installed in two local paths:
# 1. ~/.claude/local/claude (after `claude migrate-installer`)
# 2. ~/.claude/local/node_modules/.bin/claude (npm-local install, e.g. via nvm)
# Neither path may be on the system PATH, so we check them explicitly.
if tool == "claude":
if CLAUDE_LOCAL_PATH.is_file() or CLAUDE_NPM_LOCAL_PATH.is_file():
if tracker:
tracker.complete(tool, "available")
return True
if tool == "kiro-cli":
# Kiro currently supports both executable names. Prefer kiro-cli and
# accept kiro as a compatibility fallback.
found = shutil.which("kiro-cli") is not None or shutil.which("kiro") is not None
else:
found = shutil.which(tool) is not None
if tracker:
if found:
tracker.complete(tool, "available")
else:
tracker.error(tool, "not found")
return found
def is_git_repo(path: Path = None) -> bool:
"""Check if the specified path is inside a git repository."""
if path is None:
path = Path.cwd()
if not path.is_dir():
return False
try:
subprocess.run(
["git", "rev-parse", "--is-inside-work-tree"],
check=True,
capture_output=True,
cwd=path,
)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def init_git_repo(project_path: Path, quiet: bool = False) -> tuple[bool, Optional[str]]:
"""Initialize a git repository in the specified path."""
try:
original_cwd = Path.cwd()
os.chdir(project_path)
if not quiet:
console.print("[cyan]Initializing git repository...[/cyan]")
subprocess.run(["git", "init"], check=True, capture_output=True, text=True)
subprocess.run(["git", "add", "."], check=True, capture_output=True, text=True)
subprocess.run(["git", "commit", "-m", "Initial commit from Specify template"], check=True, capture_output=True, text=True)
if not quiet:
console.print("[green]✓[/green] Git repository initialized")
return True, None
except subprocess.CalledProcessError as e:
error_msg = f"Command: {' '.join(e.cmd)}\nExit code: {e.returncode}"
if e.stderr:
error_msg += f"\nError: {e.stderr.strip()}"
elif e.stdout:
error_msg += f"\nOutput: {e.stdout.strip()}"
if not quiet:
console.print(f"[red]Error initializing git repository:[/red] {e}")
return False, error_msg
finally:
os.chdir(original_cwd)
def handle_vscode_settings(sub_item, dest_file, rel_path, verbose=False, tracker=None) -> None:
"""Handle merging or copying of .vscode/settings.json files.
Note: when merge produces changes, rewritten output is normalized JSON and
existing JSONC comments/trailing commas are not preserved.
"""
def log(message, color="green"):
if verbose and not tracker:
console.print(f"[{color}]{message}[/] {rel_path}")
def atomic_write_json(target_file: Path, payload: dict[str, Any]) -> None:
"""Atomically write JSON while preserving existing mode bits when possible."""
temp_path: Optional[Path] = None
try:
with tempfile.NamedTemporaryFile(
mode='w',
encoding='utf-8',
dir=target_file.parent,
prefix=f"{target_file.name}.",
suffix=".tmp",
delete=False,
) as f:
temp_path = Path(f.name)
json.dump(payload, f, indent=4)
f.write('\n')
if target_file.exists():
try:
existing_stat = target_file.stat()
os.chmod(temp_path, stat.S_IMODE(existing_stat.st_mode))
if hasattr(os, "chown"):
try:
os.chown(temp_path, existing_stat.st_uid, existing_stat.st_gid)
except PermissionError:
# Best-effort owner/group preservation without requiring elevated privileges.
pass
except OSError:
# Best-effort metadata preservation; data safety is prioritized.
pass
os.replace(temp_path, target_file)
except Exception:
if temp_path and temp_path.exists():
temp_path.unlink()
raise
try:
with open(sub_item, 'r', encoding='utf-8') as f:
# json5 natively supports comments and trailing commas (JSONC)
new_settings = json5.load(f)
if dest_file.exists():
merged = merge_json_files(dest_file, new_settings, verbose=verbose and not tracker)
if merged is not None:
atomic_write_json(dest_file, merged)
log("Merged:", "green")
log("Note: comments/trailing commas are normalized when rewritten", "yellow")
else:
log("Skipped merge (preserved existing settings)", "yellow")
else:
shutil.copy2(sub_item, dest_file)
log("Copied (no existing settings.json):", "blue")
except Exception as e:
log(f"Warning: Could not merge settings: {e}", "yellow")
if not dest_file.exists():
shutil.copy2(sub_item, dest_file)
def merge_json_files(existing_path: Path, new_content: Any, verbose: bool = False) -> Optional[dict[str, Any]]:
"""Merge new JSON content into existing JSON file.
Performs a polite deep merge where:
- New keys are added
- Existing keys are preserved (not overwritten) unless both values are dictionaries
- Nested dictionaries are merged recursively only when both sides are dictionaries
- Lists and other values are preserved from base if they exist
Args:
existing_path: Path to existing JSON file
new_content: New JSON content to merge in
verbose: Whether to print merge details
Returns:
Merged JSON content as dict, or None if the existing file should be left untouched.
"""
# Load existing content first to have a safe fallback
existing_content = None
exists = existing_path.exists()
if exists:
try:
with open(existing_path, 'r', encoding='utf-8') as f:
# Handle comments (JSONC) natively with json5
# Note: json5 handles BOM automatically
existing_content = json5.load(f)
except FileNotFoundError:
# Handle race condition where file is deleted after exists() check
exists = False
except Exception as e:
if verbose:
console.print(f"[yellow]Warning: Could not read or parse existing JSON in {existing_path.name} ({e}).[/yellow]")
# Skip merge to preserve existing file if unparseable or inaccessible (e.g. PermissionError)
return None
# Validate template content
if not isinstance(new_content, dict):
if verbose:
console.print(f"[yellow]Warning: Template content for {existing_path.name} is not a dictionary. Preserving existing settings.[/yellow]")
return None
if not exists:
return new_content
# If existing content parsed but is not a dict, skip merge to avoid data loss
if not isinstance(existing_content, dict):
if verbose:
console.print(f"[yellow]Warning: Existing JSON in {existing_path.name} is not an object. Skipping merge to avoid data loss.[/yellow]")
return None
def deep_merge_polite(base: dict[str, Any], update: dict[str, Any]) -> dict[str, Any]:
"""Recursively merge update dict into base dict, preserving base values."""
result = base.copy()
for key, value in update.items():
if key not in result:
# Add new key
result[key] = value
elif isinstance(result[key], dict) and isinstance(value, dict):
# Recursively merge nested dictionaries
result[key] = deep_merge_polite(result[key], value)
else:
# Key already exists and values are not both dicts; preserve existing value.
# This ensures user settings aren't overwritten by template defaults.
pass
return result
merged = deep_merge_polite(existing_content, new_content)
# Detect if anything actually changed. If not, return None so the caller
# can skip rewriting the file (preserving user's comments/formatting).
if merged == existing_content:
return None
if verbose:
console.print(f"[cyan]Merged JSON file:[/cyan] {existing_path.name}")
return merged
def _locate_core_pack() -> Path | None:
"""Return the filesystem path to the bundled core_pack directory, or None.
Only present in wheel installs: hatchling's force-include copies
templates/, scripts/ etc. into specify_cli/core_pack/ at build time.
Source-checkout and editable installs do NOT have this directory.
Callers that need to work in both environments must check the repo-root
trees (templates/, scripts/) as a fallback when this returns None.
"""
# Wheel install: core_pack is a sibling directory of this file
candidate = Path(__file__).parent / "core_pack"
if candidate.is_dir():
return candidate
return None
def _repo_root() -> Path:
"""Return the source checkout root used for editable installs."""
return Path(__file__).parent.parent.parent
def _locate_bundled_extension(extension_id: str) -> Path | None:
"""Return the path to a bundled extension, or None.
Checks the wheel's core_pack first, then falls back to the
source-checkout ``extensions/<id>/`` directory.
"""
import re as _re
if not _re.match(r'^[a-z0-9-]+$', extension_id):
return None
core = _locate_core_pack()
if core is not None:
candidate = core / "extensions" / extension_id
if (candidate / "extension.yml").is_file():
return candidate
# Source-checkout / editable install: look relative to repo root
candidate = _repo_root() / "extensions" / extension_id
if (candidate / "extension.yml").is_file():
return candidate
return None
def _locate_bundled_workflow(workflow_id: str) -> Path | None:
"""Return the path to a bundled workflow directory, or None.
Checks the wheel's core_pack first, then falls back to the
source-checkout ``workflows/<id>/`` directory.
"""
import re as _re
if not _re.match(r'^[a-z0-9](?:[a-z0-9-]*[a-z0-9])?$', workflow_id):
return None
core = _locate_core_pack()
if core is not None:
candidate = core / "workflows" / workflow_id
if (candidate / "workflow.yml").is_file():
return candidate
# Source-checkout / editable install: look relative to repo root
candidate = _repo_root() / "workflows" / workflow_id
if (candidate / "workflow.yml").is_file():
return candidate
return None
def _locate_bundled_preset(preset_id: str) -> Path | None:
"""Return the path to a bundled preset, or None.
Checks the wheel's core_pack first, then falls back to the
source-checkout ``presets/<id>/`` directory.
"""
import re as _re
if not _re.match(r'^[a-z0-9-]+$', preset_id):
return None
core = _locate_core_pack()
if core is not None:
candidate = core / "presets" / preset_id
if (candidate / "preset.yml").is_file():
return candidate
# Source-checkout / editable install: look relative to repo root
candidate = _repo_root() / "presets" / preset_id
if (candidate / "preset.yml").is_file():
return candidate
return None
def _refresh_shared_templates(
project_path: Path,
*,
@@ -1701,27 +1368,6 @@ preset_catalog_app = typer.Typer(
preset_app.add_typer(preset_catalog_app, name="catalog")
def get_speckit_version() -> str:
"""Get current spec-kit version."""
import importlib.metadata
try:
return importlib.metadata.version("specify-cli")
except Exception:
# Fallback: try reading from pyproject.toml
try:
import tomllib
pyproject_path = _repo_root() / "pyproject.toml"
if pyproject_path.exists():
with open(pyproject_path, "rb") as f:
data = tomllib.load(f)
return data.get("project", {}).get("version", "unknown")
except Exception:
# Intentionally ignore any errors while reading/parsing pyproject.toml.
# If this lookup fails for any reason, we fall back to returning "unknown" below.
pass
return "unknown"
# ===== Integration Commands =====
integration_app = typer.Typer(
@@ -1918,19 +1564,6 @@ def _set_default_integration_or_exit(*args: Any, **kwargs: Any) -> None:
raise typer.Exit(1)
def _display_project_path(project_root: Path, path: str | Path) -> str:
"""Return a stable POSIX-style display path for paths under a project."""
path_obj = Path(path)
try:
rel_path = path_obj.relative_to(project_root) if path_obj.is_absolute() else path_obj
except ValueError:
try:
rel_path = path_obj.resolve().relative_to(project_root.resolve())
except (OSError, ValueError):
return path_obj.as_posix()
return rel_path.as_posix()
def _require_specify_project() -> Path:
"""Return the current project root if it is a spec-kit project, else exit."""
project_root = Path.cwd()

121
src/specify_cli/_assets.py Normal file
View File

@@ -0,0 +1,121 @@
"""Bundle path resolution and version lookup for specify_cli.
Stdlib-only; zero internal imports so it sits at the base of the dependency
graph without risk of circular imports.
"""
from __future__ import annotations
import importlib.metadata
import re
from pathlib import Path
def _locate_core_pack() -> Path | None:
"""Return the filesystem path to the bundled core_pack directory, or None.
Only present in wheel installs: hatchling's force-include copies
templates/, scripts/ etc. into specify_cli/core_pack/ at build time.
Source-checkout and editable installs do NOT have this directory.
Callers that need to work in both environments must check the repo-root
trees (templates/, scripts/) as a fallback when this returns None.
"""
# Wheel install: core_pack is a sibling directory of this file
candidate = Path(__file__).parent / "core_pack"
if candidate.is_dir():
return candidate
return None
def _repo_root() -> Path:
"""Return the source checkout root used for editable installs."""
return Path(__file__).parent.parent.parent
def _locate_bundled_extension(extension_id: str) -> Path | None:
"""Return the path to a bundled extension, or None.
Checks the wheel's core_pack first, then falls back to the
source-checkout ``extensions/<id>/`` directory.
"""
if not re.match(r'^[a-z0-9-]+$', extension_id):
return None
core = _locate_core_pack()
if core is not None:
candidate = core / "extensions" / extension_id
if (candidate / "extension.yml").is_file():
return candidate
# Source-checkout / editable install: look relative to repo root
candidate = _repo_root() / "extensions" / extension_id
if (candidate / "extension.yml").is_file():
return candidate
return None
def _locate_bundled_workflow(workflow_id: str) -> Path | None:
"""Return the path to a bundled workflow directory, or None.
Checks the wheel's core_pack first, then falls back to the
source-checkout ``workflows/<id>/`` directory.
"""
if not re.match(r'^[a-z0-9](?:[a-z0-9-]*[a-z0-9])?$', workflow_id):
return None
core = _locate_core_pack()
if core is not None:
candidate = core / "workflows" / workflow_id
if (candidate / "workflow.yml").is_file():
return candidate
# Source-checkout / editable install: look relative to repo root
candidate = _repo_root() / "workflows" / workflow_id
if (candidate / "workflow.yml").is_file():
return candidate
return None
def _locate_bundled_preset(preset_id: str) -> Path | None:
"""Return the path to a bundled preset, or None.
Checks the wheel's core_pack first, then falls back to the
source-checkout ``presets/<id>/`` directory.
"""
if not re.match(r'^[a-z0-9-]+$', preset_id):
return None
core = _locate_core_pack()
if core is not None:
candidate = core / "presets" / preset_id
if (candidate / "preset.yml").is_file():
return candidate
# Source-checkout / editable install: look relative to repo root
candidate = _repo_root() / "presets" / preset_id
if (candidate / "preset.yml").is_file():
return candidate
return None
def get_speckit_version() -> str:
"""Get current spec-kit version."""
try:
return importlib.metadata.version("specify-cli")
except Exception:
# Fallback: try reading from pyproject.toml
try:
import tomllib
pyproject_path = _repo_root() / "pyproject.toml"
if pyproject_path.exists():
with open(pyproject_path, "rb") as f:
data = tomllib.load(f)
return data.get("project", {}).get("version", "unknown")
except Exception:
# Intentionally ignore any errors while reading/parsing pyproject.toml.
# If this lookup fails for any reason, we fall back to returning "unknown" below.
pass
return "unknown"

282
src/specify_cli/_utils.py Normal file
View File

@@ -0,0 +1,282 @@
"""System utilities: subprocess, tool detection, file operations."""
from __future__ import annotations
import json
import json5
import os
import shutil
import stat
import subprocess
import tempfile
from pathlib import Path
from typing import Any
from ._console import console
CLAUDE_LOCAL_PATH = Path.home() / ".claude" / "local" / "claude"
CLAUDE_NPM_LOCAL_PATH = Path.home() / ".claude" / "local" / "node_modules" / ".bin" / "claude"
def run_command(cmd: list[str], check_return: bool = True, capture: bool = False, shell: bool = False) -> str | None:
"""Run a shell command and optionally capture output."""
try:
if capture:
result = subprocess.run(cmd, check=check_return, capture_output=True, text=True, shell=shell)
return result.stdout.strip()
else:
subprocess.run(cmd, check=check_return, shell=shell)
return None
except subprocess.CalledProcessError as e:
if check_return:
console.print(f"[red]Error running command:[/red] {' '.join(cmd)}")
console.print(f"[red]Exit code:[/red] {e.returncode}")
if hasattr(e, 'stderr') and e.stderr:
console.print(f"[red]Error output:[/red] {e.stderr}")
raise
return None
def check_tool(tool: str, tracker=None) -> bool:
"""Check if a tool is installed. Optionally update tracker.
Args:
tool: Name of the tool to check
tracker: StepTracker | None to update with results
Returns:
True if tool is found, False otherwise
"""
# Special handling for Claude CLI local installs
# See: https://github.com/github/spec-kit/issues/123
# See: https://github.com/github/spec-kit/issues/550
# Claude Code can be installed in two local paths:
# 1. ~/.claude/local/claude (after `claude migrate-installer`)
# 2. ~/.claude/local/node_modules/.bin/claude (npm-local install, e.g. via nvm)
# Neither path may be on the system PATH, so we check them explicitly.
if tool == "claude":
if CLAUDE_LOCAL_PATH.is_file() or CLAUDE_NPM_LOCAL_PATH.is_file():
if tracker:
tracker.complete(tool, "available")
return True
if tool == "kiro-cli":
# Kiro currently supports both executable names. Prefer kiro-cli and
# accept kiro as a compatibility fallback.
found = shutil.which("kiro-cli") is not None or shutil.which("kiro") is not None
else:
found = shutil.which(tool) is not None
if tracker:
if found:
tracker.complete(tool, "available")
else:
tracker.error(tool, "not found")
return found
def is_git_repo(path: Path | None = None) -> bool:
"""Check if the specified path is inside a git repository."""
if path is None:
path = Path.cwd()
if not path.is_dir():
return False
try:
subprocess.run(
["git", "rev-parse", "--is-inside-work-tree"],
check=True,
capture_output=True,
cwd=path,
)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def init_git_repo(project_path: Path, quiet: bool = False) -> tuple[bool, str | None]:
"""Initialize a git repository in the specified path."""
try:
original_cwd = Path.cwd()
os.chdir(project_path)
if not quiet:
console.print("[cyan]Initializing git repository...[/cyan]")
subprocess.run(["git", "init"], check=True, capture_output=True, text=True)
subprocess.run(["git", "add", "."], check=True, capture_output=True, text=True)
subprocess.run(["git", "commit", "-m", "Initial commit from Specify template"], check=True, capture_output=True, text=True)
if not quiet:
console.print("[green]✓[/green] Git repository initialized")
return True, None
except subprocess.CalledProcessError as e:
error_msg = f"Command: {' '.join(e.cmd)}\nExit code: {e.returncode}"
if e.stderr:
error_msg += f"\nError: {e.stderr.strip()}"
elif e.stdout:
error_msg += f"\nOutput: {e.stdout.strip()}"
if not quiet:
console.print(f"[red]Error initializing git repository:[/red] {e}")
return False, error_msg
finally:
os.chdir(original_cwd)
def handle_vscode_settings(sub_item, dest_file, rel_path, verbose=False, tracker=None) -> None:
"""Handle merging or copying of .vscode/settings.json files.
Note: when merge produces changes, rewritten output is normalized JSON and
existing JSONC comments/trailing commas are not preserved.
"""
def log(message, color="green"):
if verbose and not tracker:
console.print(f"[{color}]{message}[/] {rel_path}")
def atomic_write_json(target_file: Path, payload: dict[str, Any]) -> None:
"""Atomically write JSON while preserving existing mode bits when possible."""
temp_path: Path | None = None
try:
with tempfile.NamedTemporaryFile(
mode='w',
encoding='utf-8',
dir=target_file.parent,
prefix=f"{target_file.name}.",
suffix=".tmp",
delete=False,
) as f:
temp_path = Path(f.name)
json.dump(payload, f, indent=4)
f.write('\n')
if target_file.exists():
try:
existing_stat = target_file.stat()
os.chmod(temp_path, stat.S_IMODE(existing_stat.st_mode))
if hasattr(os, "chown"):
try:
os.chown(temp_path, existing_stat.st_uid, existing_stat.st_gid)
except PermissionError:
# Best-effort owner/group preservation without requiring elevated privileges.
pass
except OSError:
# Best-effort metadata preservation; data safety is prioritized.
pass
os.replace(temp_path, target_file)
except Exception:
if temp_path and temp_path.exists():
temp_path.unlink()
raise
try:
with open(sub_item, 'r', encoding='utf-8') as f:
# json5 natively supports comments and trailing commas (JSONC)
new_settings = json5.load(f)
if dest_file.exists():
merged = merge_json_files(dest_file, new_settings, verbose=verbose and not tracker)
if merged is not None:
atomic_write_json(dest_file, merged)
log("Merged:", "green")
log("Note: comments/trailing commas are normalized when rewritten", "yellow")
else:
log("Skipped merge (preserved existing settings)", "yellow")
else:
shutil.copy2(sub_item, dest_file)
log("Copied (no existing settings.json):", "blue")
except Exception as e:
log(f"Warning: Could not merge settings: {e}", "yellow")
if not dest_file.exists():
shutil.copy2(sub_item, dest_file)
def merge_json_files(existing_path: Path, new_content: Any, verbose: bool = False) -> dict[str, Any] | None:
"""Merge new JSON content into existing JSON file.
Performs a polite deep merge where:
- New keys are added
- Existing keys are preserved (not overwritten) unless both values are dictionaries
- Nested dictionaries are merged recursively only when both sides are dictionaries
- Lists and other values are preserved from base if they exist
Args:
existing_path: Path to existing JSON file
new_content: New JSON content to merge in
verbose: Whether to print merge details
Returns:
Merged JSON content as dict, or None if the existing file should be left untouched.
"""
# Load existing content first to have a safe fallback
existing_content = None
exists = existing_path.exists()
if exists:
try:
with open(existing_path, 'r', encoding='utf-8') as f:
# Handle comments (JSONC) natively with json5
# Note: json5 handles BOM automatically
existing_content = json5.load(f)
except FileNotFoundError:
# Handle race condition where file is deleted after exists() check
exists = False
except Exception as e:
if verbose:
console.print(f"[yellow]Warning: Could not read or parse existing JSON in {existing_path.name} ({e}).[/yellow]")
# Skip merge to preserve existing file if unparseable or inaccessible (e.g. PermissionError)
return None
# Validate template content
if not isinstance(new_content, dict):
if verbose:
console.print(f"[yellow]Warning: Template content for {existing_path.name} is not a dictionary. Preserving existing settings.[/yellow]")
return None
if not exists:
return new_content
# If existing content parsed but is not a dict, skip merge to avoid data loss
if not isinstance(existing_content, dict):
if verbose:
console.print(f"[yellow]Warning: Existing JSON in {existing_path.name} is not an object. Skipping merge to avoid data loss.[/yellow]")
return None
def deep_merge_polite(base: dict[str, Any], update: dict[str, Any]) -> dict[str, Any]:
"""Recursively merge update dict into base dict, preserving base values."""
result = base.copy()
for key, value in update.items():
if key not in result:
# Add new key
result[key] = value
elif isinstance(result[key], dict) and isinstance(value, dict):
# Recursively merge nested dictionaries
result[key] = deep_merge_polite(result[key], value)
else:
# Key already exists and values are not both dicts; preserve existing value.
# This ensures user settings aren't overwritten by template defaults.
pass
return result
merged = deep_merge_polite(existing_content, new_content)
# Detect if anything actually changed. If not, return None so the caller
# can skip rewriting the file (preserving user's comments/formatting).
if merged == existing_content:
return None
if verbose:
console.print(f"[cyan]Merged JSON file:[/cyan] {existing_path.name}")
return merged
def _display_project_path(project_root: Path, path: str | Path) -> str:
"""Return a stable POSIX-style display path for paths under a project."""
path_obj = Path(path)
try:
rel_path = path_obj.relative_to(project_root) if path_obj.is_absolute() else path_obj
except ValueError:
try:
rel_path = path_obj.resolve().relative_to(project_root.resolve())
except (OSError, ValueError):
return path_obj.as_posix()
return rel_path.as_posix()

View File

@@ -22,7 +22,9 @@ class TestCheckToolClaude:
fake_missing = tmp_path / "nonexistent" / "claude"
with patch("specify_cli.CLAUDE_LOCAL_PATH", fake_claude), \
patch("specify_cli._utils.CLAUDE_LOCAL_PATH", fake_claude), \
patch("specify_cli.CLAUDE_NPM_LOCAL_PATH", fake_missing), \
patch("specify_cli._utils.CLAUDE_NPM_LOCAL_PATH", fake_missing), \
patch("shutil.which", return_value=None):
assert check_tool("claude") is True
@@ -36,7 +38,9 @@ class TestCheckToolClaude:
fake_migrate = tmp_path / "nonexistent" / "claude"
with patch("specify_cli.CLAUDE_LOCAL_PATH", fake_migrate), \
patch("specify_cli._utils.CLAUDE_LOCAL_PATH", fake_migrate), \
patch("specify_cli.CLAUDE_NPM_LOCAL_PATH", fake_npm_claude), \
patch("specify_cli._utils.CLAUDE_NPM_LOCAL_PATH", fake_npm_claude), \
patch("shutil.which", return_value=None):
assert check_tool("claude") is True
@@ -45,7 +49,9 @@ class TestCheckToolClaude:
fake_missing = tmp_path / "nonexistent" / "claude"
with patch("specify_cli.CLAUDE_LOCAL_PATH", fake_missing), \
patch("specify_cli._utils.CLAUDE_LOCAL_PATH", fake_missing), \
patch("specify_cli.CLAUDE_NPM_LOCAL_PATH", fake_missing), \
patch("specify_cli._utils.CLAUDE_NPM_LOCAL_PATH", fake_missing), \
patch("shutil.which", return_value="/usr/local/bin/claude"):
assert check_tool("claude") is True
@@ -54,7 +60,9 @@ class TestCheckToolClaude:
fake_missing = tmp_path / "nonexistent" / "claude"
with patch("specify_cli.CLAUDE_LOCAL_PATH", fake_missing), \
patch("specify_cli._utils.CLAUDE_LOCAL_PATH", fake_missing), \
patch("specify_cli.CLAUDE_NPM_LOCAL_PATH", fake_missing), \
patch("specify_cli._utils.CLAUDE_NPM_LOCAL_PATH", fake_missing), \
patch("shutil.which", return_value=None):
assert check_tool("claude") is False
@@ -68,7 +76,9 @@ class TestCheckToolClaude:
tracker = MagicMock()
with patch("specify_cli.CLAUDE_LOCAL_PATH", fake_missing), \
patch("specify_cli._utils.CLAUDE_LOCAL_PATH", fake_missing), \
patch("specify_cli.CLAUDE_NPM_LOCAL_PATH", fake_npm_claude), \
patch("specify_cli._utils.CLAUDE_NPM_LOCAL_PATH", fake_npm_claude), \
patch("shutil.which", return_value=None):
result = check_tool("claude", tracker=tracker)

View File

@@ -0,0 +1,21 @@
"""Regression guard: utility and asset symbols importable from specify_cli."""
from specify_cli import (
run_command, check_tool, is_git_repo, init_git_repo,
handle_vscode_settings, merge_json_files,
get_speckit_version,
CLAUDE_LOCAL_PATH, CLAUDE_NPM_LOCAL_PATH,
)
from pathlib import Path
def test_utils_symbols_importable():
assert callable(check_tool)
assert callable(merge_json_files)
assert callable(is_git_repo)
def test_get_speckit_version_returns_string():
version = get_speckit_version()
assert isinstance(version, str) and len(version) > 0
def test_claude_paths_are_paths():
assert isinstance(CLAUDE_LOCAL_PATH, Path)
assert isinstance(CLAUDE_NPM_LOCAL_PATH, Path)