mirror of
https://github.com/github/spec-kit.git
synced 2026-07-04 04:45:43 +08:00
Compare commits
23 Commits
v0.12.2
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d75366fd5 | ||
|
|
7344071b7e | ||
|
|
a69d427e03 | ||
|
|
a8320d9b61 | ||
|
|
0825f508a7 | ||
|
|
eec1291896 | ||
|
|
7ff9c8bdd4 | ||
|
|
1015ff24da | ||
|
|
05798a9e70 | ||
|
|
bd04937927 | ||
|
|
1786d27c06 | ||
|
|
0a02369ebe | ||
|
|
e0495ebc38 | ||
|
|
cb87a410f8 | ||
|
|
0fd0bf6b9f | ||
|
|
d00509e770 | ||
|
|
c44cc245ed | ||
|
|
0c6cc4502c | ||
|
|
d78ead1802 | ||
|
|
b3a60f5fba | ||
|
|
b37f117cf9 | ||
|
|
a434e5a8ed | ||
|
|
1bda2f0cb4 |
@@ -2576,7 +2576,7 @@ def preset_list():
|
||||
@preset_app.command("add")
|
||||
def preset_add(
|
||||
preset_id: str = typer.Argument(None, help="Preset ID to install from catalog"),
|
||||
from_url: str = typer.Option(None, "--from", help="Install from a URL (ZIP file)"),
|
||||
from_url: str = typer.Option(None, "--from", help="Install from a URL (ZIP or .tar.gz/.tgz archive)"),
|
||||
dev: str = typer.Option(None, "--dev", help="Install from local directory (development mode)"),
|
||||
priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"),
|
||||
):
|
||||
@@ -2629,17 +2629,46 @@ def preset_add(
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import tempfile
|
||||
from .extensions import detect_archive_format as _det_fmt
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
zip_path = Path(tmpdir) / "preset.zip"
|
||||
final_url = from_url
|
||||
archive_fmt = ""
|
||||
try:
|
||||
with urllib.request.urlopen(from_url, timeout=60) as response:
|
||||
zip_path.write_bytes(response.read())
|
||||
final_url = response.geturl()
|
||||
# Re-validate scheme after any redirect (scheme-downgrade
|
||||
# guard). Check BEFORE reading the body so an insecure
|
||||
# redirect cannot cause us to fetch the payload.
|
||||
_fp = _urlparse(final_url)
|
||||
_fl = _fp.hostname in ("localhost", "127.0.0.1", "::1")
|
||||
if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl):
|
||||
console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}")
|
||||
raise typer.Exit(1)
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
# Prefer the post-redirect URL for format detection;
|
||||
# fall back to the original URL only as a last hint.
|
||||
archive_fmt = _det_fmt(final_url, content_type)
|
||||
if not archive_fmt:
|
||||
archive_fmt = _det_fmt(from_url)
|
||||
archive_data = response.read()
|
||||
except urllib.error.URLError as e:
|
||||
console.print(f"[red]Error:[/red] Failed to download: {e}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
manifest = manager.install_from_zip(zip_path, speckit_version, priority)
|
||||
if not archive_fmt:
|
||||
console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.")
|
||||
console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.")
|
||||
raise typer.Exit(1)
|
||||
|
||||
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
|
||||
archive_path = Path(tmpdir) / f"preset{suffix}"
|
||||
try:
|
||||
archive_path.write_bytes(archive_data)
|
||||
manifest = manager.install_from_zip(archive_path, speckit_version, priority)
|
||||
except OSError as e:
|
||||
console.print(f"[red]Error:[/red] Failed to save or install archive: {e}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
console.print(f"[green]✓[/green] Preset '{manifest.name}' v{manifest.version} installed (priority {priority})")
|
||||
|
||||
@@ -3573,7 +3602,7 @@ def catalog_remove(
|
||||
def extension_add(
|
||||
extension: str = typer.Argument(help="Extension name or path"),
|
||||
dev: bool = typer.Option(False, "--dev", help="Install from local directory"),
|
||||
from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL"),
|
||||
from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL (ZIP or .tar.gz/.tgz archive)"),
|
||||
priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"),
|
||||
):
|
||||
"""Install an extension."""
|
||||
@@ -3612,10 +3641,11 @@ def extension_add(
|
||||
manifest = manager.install_from_directory(source_path, speckit_version, priority=priority)
|
||||
|
||||
elif from_url:
|
||||
# Install from URL (ZIP file)
|
||||
# Install from URL (ZIP or tar.gz archive)
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from urllib.parse import urlparse
|
||||
from .extensions import detect_archive_format
|
||||
|
||||
# Validate URL
|
||||
parsed = urlparse(from_url)
|
||||
@@ -3631,25 +3661,53 @@ def extension_add(
|
||||
console.print("Only install extensions from sources you trust.\n")
|
||||
console.print(f"Downloading from {from_url}...")
|
||||
|
||||
# Download ZIP to temp location
|
||||
# Download archive to temp location; detect format from the
|
||||
# post-redirect URL (with Content-Type fallback), only using
|
||||
# the original URL as a last hint.
|
||||
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
|
||||
download_dir.mkdir(parents=True, exist_ok=True)
|
||||
zip_path = download_dir / f"{extension}-url-download.zip"
|
||||
archive_fmt = ""
|
||||
archive_path = None
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(from_url, timeout=60) as response:
|
||||
zip_data = response.read()
|
||||
zip_path.write_bytes(zip_data)
|
||||
final_url = response.geturl()
|
||||
# Re-validate scheme after any redirect (scheme-downgrade
|
||||
# guard). Check BEFORE reading the body so an insecure
|
||||
# redirect cannot cause us to fetch the payload.
|
||||
_fp = urlparse(final_url)
|
||||
_fl = _fp.hostname in ("localhost", "127.0.0.1", "::1")
|
||||
if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl):
|
||||
console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}")
|
||||
raise typer.Exit(1)
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
archive_fmt = detect_archive_format(final_url, content_type)
|
||||
if not archive_fmt:
|
||||
archive_fmt = detect_archive_format(from_url)
|
||||
archive_data = response.read()
|
||||
|
||||
# Install from downloaded ZIP
|
||||
manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority)
|
||||
if not archive_fmt:
|
||||
console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.")
|
||||
console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.")
|
||||
raise typer.Exit(1)
|
||||
|
||||
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
|
||||
safe_name = Path(extension).name or "extension"
|
||||
archive_path = download_dir / f"{safe_name}-url-download{suffix}"
|
||||
archive_path.write_bytes(archive_data)
|
||||
|
||||
# Install from downloaded archive
|
||||
manifest = manager.install_from_zip(archive_path, speckit_version, priority=priority)
|
||||
except urllib.error.URLError as e:
|
||||
console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}")
|
||||
raise typer.Exit(1)
|
||||
except OSError as e:
|
||||
console.print(f"[red]Error:[/red] Failed to save or install archive: {e}")
|
||||
raise typer.Exit(1)
|
||||
finally:
|
||||
# Clean up downloaded ZIP
|
||||
if zip_path.exists():
|
||||
zip_path.unlink()
|
||||
# Clean up the downloaded archive
|
||||
if archive_path is not None and archive_path.exists():
|
||||
archive_path.unlink()
|
||||
|
||||
else:
|
||||
# Try bundled extensions first (shipped with spec-kit)
|
||||
@@ -4301,29 +4359,55 @@ def extension_update(
|
||||
backup_hooks[hook_name] = ext_hooks
|
||||
|
||||
# 5. Download new version
|
||||
zip_path = catalog.download_extension(extension_id)
|
||||
archive_path = catalog.download_extension(extension_id)
|
||||
try:
|
||||
# 6. Validate extension ID from ZIP BEFORE modifying installation
|
||||
# Handle both root-level and nested extension.yml (GitHub auto-generated ZIPs)
|
||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
||||
import yaml
|
||||
manifest_data = None
|
||||
namelist = zf.namelist()
|
||||
# 6. Validate extension ID from archive BEFORE modifying installation
|
||||
# Handle both root-level and nested extension.yml (GitHub auto-generated archives)
|
||||
from .extensions import detect_archive_format
|
||||
import tarfile
|
||||
archive_fmt = detect_archive_format(str(archive_path))
|
||||
import yaml
|
||||
manifest_data = None
|
||||
|
||||
# First try root-level extension.yml
|
||||
if "extension.yml" in namelist:
|
||||
with zf.open("extension.yml") as f:
|
||||
manifest_data = yaml.safe_load(f) or {}
|
||||
else:
|
||||
# Look for extension.yml in a single top-level subdirectory
|
||||
# (e.g., "repo-name-branch/extension.yml")
|
||||
manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1]
|
||||
if len(manifest_paths) == 1:
|
||||
with zf.open(manifest_paths[0]) as f:
|
||||
if archive_fmt == "tar.gz":
|
||||
with tarfile.open(archive_path, "r:gz") as tf:
|
||||
# First try root-level extension.yml
|
||||
try:
|
||||
m = tf.getmember("extension.yml")
|
||||
f = tf.extractfile(m)
|
||||
if f is not None:
|
||||
with f:
|
||||
manifest_data = yaml.safe_load(f.read()) or {}
|
||||
except KeyError:
|
||||
# extension.yml not present at archive root; use nested fallback below.
|
||||
manifest_data = None
|
||||
# Fall back to nested-directory search if root-level
|
||||
# was missing (KeyError) or not a regular file (None).
|
||||
if manifest_data is None:
|
||||
members = [m for m in tf.getmembers() if m.name.endswith("/extension.yml") and m.name.count("/") == 1]
|
||||
if len(members) == 1:
|
||||
f = tf.extractfile(members[0])
|
||||
if f is not None:
|
||||
with f:
|
||||
manifest_data = yaml.safe_load(f.read()) or {}
|
||||
else:
|
||||
with zipfile.ZipFile(archive_path, "r") as zf:
|
||||
namelist = zf.namelist()
|
||||
|
||||
# First try root-level extension.yml
|
||||
if "extension.yml" in namelist:
|
||||
with zf.open("extension.yml") as f:
|
||||
manifest_data = yaml.safe_load(f) or {}
|
||||
else:
|
||||
# Look for extension.yml in a single top-level subdirectory
|
||||
# (e.g., "repo-name-branch/extension.yml")
|
||||
manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1]
|
||||
if len(manifest_paths) == 1:
|
||||
with zf.open(manifest_paths[0]) as f:
|
||||
manifest_data = yaml.safe_load(f) or {}
|
||||
|
||||
if manifest_data is None:
|
||||
raise ValueError("Downloaded extension archive is missing 'extension.yml'")
|
||||
if manifest_data is None:
|
||||
raise ValueError("Downloaded extension archive is missing 'extension.yml'")
|
||||
|
||||
zip_extension_id = manifest_data.get("extension", {}).get("id")
|
||||
if zip_extension_id != extension_id:
|
||||
@@ -4335,7 +4419,7 @@ def extension_update(
|
||||
manager.remove(extension_id, keep_config=True)
|
||||
|
||||
# 8. Install new version
|
||||
_ = manager.install_from_zip(zip_path, speckit_version)
|
||||
_ = manager.install_from_zip(archive_path, speckit_version)
|
||||
|
||||
# Restore user config files from backup after successful install.
|
||||
new_extension_dir = manager.extensions_dir / extension_id
|
||||
@@ -4381,9 +4465,9 @@ def extension_update(
|
||||
hook["enabled"] = False
|
||||
hook_executor.save_project_config(config)
|
||||
finally:
|
||||
# Clean up downloaded ZIP
|
||||
if zip_path.exists():
|
||||
zip_path.unlink()
|
||||
# Clean up downloaded archive
|
||||
if archive_path.exists():
|
||||
archive_path.unlink()
|
||||
|
||||
# 10. Clean up backup on success
|
||||
if backup_base.exists():
|
||||
@@ -4875,6 +4959,59 @@ def workflow_list():
|
||||
console.print()
|
||||
|
||||
|
||||
def _extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes:
|
||||
"""Extract ``workflow.yml`` from a ZIP or ``.tar.gz`` archive.
|
||||
|
||||
Searches the archive root and a single nested top-level subdirectory
|
||||
(e.g., ``repo-name-1.0/workflow.yml``).
|
||||
|
||||
Args:
|
||||
archive_path: Path to the downloaded archive.
|
||||
archive_fmt: ``"zip"`` or ``"tar.gz"``.
|
||||
|
||||
Returns:
|
||||
Raw bytes of the ``workflow.yml`` file.
|
||||
|
||||
Raises:
|
||||
ValueError: If no ``workflow.yml`` is found in the archive.
|
||||
"""
|
||||
import tarfile
|
||||
|
||||
if archive_fmt == "tar.gz":
|
||||
with tarfile.open(archive_path, "r:gz") as tf:
|
||||
# Try root-level first.
|
||||
try:
|
||||
f = tf.extractfile(tf.getmember("workflow.yml"))
|
||||
if f is not None:
|
||||
with f:
|
||||
return f.read()
|
||||
except KeyError:
|
||||
pass # Root-level workflow.yml not found; fall through to subdirectory search below.
|
||||
# Look in a single top-level subdirectory.
|
||||
candidates = [
|
||||
m for m in tf.getmembers()
|
||||
if m.name.endswith("/workflow.yml") and m.name.count("/") == 1
|
||||
]
|
||||
if len(candidates) == 1:
|
||||
f = tf.extractfile(candidates[0])
|
||||
if f is not None:
|
||||
with f:
|
||||
return f.read()
|
||||
else:
|
||||
with zipfile.ZipFile(archive_path, "r") as zf:
|
||||
namelist = zf.namelist()
|
||||
if "workflow.yml" in namelist:
|
||||
return zf.read("workflow.yml")
|
||||
candidates = [
|
||||
n for n in namelist
|
||||
if n.endswith("/workflow.yml") and n.count("/") == 1
|
||||
]
|
||||
if len(candidates) == 1:
|
||||
return zf.read(candidates[0])
|
||||
|
||||
raise ValueError("No workflow.yml found in the downloaded archive")
|
||||
|
||||
|
||||
@workflow_app.command("add")
|
||||
def workflow_add(
|
||||
source: str = typer.Argument(..., help="Workflow ID, URL, or local path"),
|
||||
@@ -4928,6 +5065,7 @@ def workflow_add(
|
||||
from ipaddress import ip_address
|
||||
from urllib.parse import urlparse
|
||||
from urllib.request import urlopen # noqa: S310
|
||||
from .extensions import detect_archive_format
|
||||
|
||||
parsed_src = urlparse(source)
|
||||
src_host = parsed_src.hostname or ""
|
||||
@@ -4958,18 +5096,53 @@ def workflow_add(
|
||||
if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb):
|
||||
console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}")
|
||||
raise typer.Exit(1)
|
||||
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
|
||||
tmp.write(resp.read())
|
||||
tmp_path = Path(tmp.name)
|
||||
|
||||
# Detect archive format from the final URL or Content-Type header.
|
||||
archive_fmt = detect_archive_format(final_url)
|
||||
if not archive_fmt:
|
||||
content_type = resp.headers.get("Content-Type", "")
|
||||
archive_fmt = detect_archive_format(final_url, content_type)
|
||||
|
||||
raw_data = resp.read()
|
||||
except typer.Exit:
|
||||
raise
|
||||
except Exception as exc:
|
||||
console.print(f"[red]Error:[/red] Failed to download workflow: {exc}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
tmp_path = None
|
||||
try:
|
||||
if archive_fmt in ("tar.gz", "zip"):
|
||||
# Extract workflow.yml from the archive.
|
||||
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
|
||||
arc_tmp_path = None
|
||||
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_tmp:
|
||||
arc_tmp_path = Path(arc_tmp.name)
|
||||
arc_tmp.write(raw_data)
|
||||
try:
|
||||
wf_yaml = _extract_workflow_yml(arc_tmp_path, archive_fmt)
|
||||
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
|
||||
tmp_path = Path(tmp.name)
|
||||
tmp.write(wf_yaml)
|
||||
finally:
|
||||
if arc_tmp_path is not None:
|
||||
arc_tmp_path.unlink(missing_ok=True)
|
||||
else:
|
||||
# Treat as a plain YAML file (existing behaviour).
|
||||
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
|
||||
tmp.write(raw_data)
|
||||
tmp_path = Path(tmp.name)
|
||||
except typer.Exit:
|
||||
raise
|
||||
except Exception as exc:
|
||||
console.print(f"[red]Error:[/red] Failed to process downloaded workflow: {exc}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
try:
|
||||
_validate_and_install_local(tmp_path, source)
|
||||
finally:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
if tmp_path is not None:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
return
|
||||
|
||||
# Try as a local file/directory
|
||||
@@ -4978,6 +5151,27 @@ def workflow_add(
|
||||
if source_path.is_file() and source_path.suffix in (".yml", ".yaml"):
|
||||
_validate_and_install_local(source_path, str(source_path))
|
||||
return
|
||||
elif source_path.is_file() and (
|
||||
source.lower().endswith(".tar.gz") or source.lower().endswith(".tgz") or source.lower().endswith(".zip")
|
||||
):
|
||||
# Local archive file containing workflow.yml
|
||||
from .extensions import detect_archive_format
|
||||
local_fmt = detect_archive_format(source)
|
||||
try:
|
||||
wf_yaml = _extract_workflow_yml(source_path, local_fmt)
|
||||
except Exception as exc:
|
||||
console.print(f"[red]Error:[/red] Failed to extract workflow from archive: {exc}")
|
||||
raise typer.Exit(1)
|
||||
import tempfile
|
||||
tmp_local = None
|
||||
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
|
||||
tmp_local = Path(tmp.name)
|
||||
tmp.write(wf_yaml)
|
||||
try:
|
||||
_validate_and_install_local(tmp_local, str(source_path))
|
||||
finally:
|
||||
tmp_local.unlink(missing_ok=True)
|
||||
return
|
||||
elif source_path.is_dir():
|
||||
wf_file = source_path / "workflow.yml"
|
||||
if not wf_file.exists():
|
||||
@@ -5041,6 +5235,7 @@ def workflow_add(
|
||||
|
||||
try:
|
||||
from urllib.request import urlopen # noqa: S310 — URL comes from catalog
|
||||
from .extensions import detect_archive_format
|
||||
|
||||
workflow_dir.mkdir(parents=True, exist_ok=True)
|
||||
with urlopen(workflow_url, timeout=30) as response: # noqa: S310
|
||||
@@ -5063,7 +5258,32 @@ def workflow_add(
|
||||
f"[red]Error:[/red] Workflow '{source}' redirected to non-HTTPS URL: {final_url}"
|
||||
)
|
||||
raise typer.Exit(1)
|
||||
workflow_file.write_bytes(response.read())
|
||||
|
||||
# Detect archive format from the final URL or Content-Type header.
|
||||
cat_archive_fmt = detect_archive_format(final_url)
|
||||
if not cat_archive_fmt:
|
||||
cat_ct = response.headers.get("Content-Type", "")
|
||||
cat_archive_fmt = detect_archive_format(final_url, cat_ct)
|
||||
|
||||
raw_response = response.read()
|
||||
|
||||
if cat_archive_fmt in ("tar.gz", "zip"):
|
||||
# Download URL points to an archive — extract workflow.yml from it.
|
||||
suffix = ".tar.gz" if cat_archive_fmt == "tar.gz" else ".zip"
|
||||
arc_tmp = None
|
||||
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_f:
|
||||
arc_tmp = Path(arc_f.name)
|
||||
arc_f.write(raw_response)
|
||||
try:
|
||||
wf_yaml_bytes = _extract_workflow_yml(arc_tmp, cat_archive_fmt)
|
||||
finally:
|
||||
if arc_tmp is not None:
|
||||
arc_tmp.unlink(missing_ok=True)
|
||||
workflow_file.write_bytes(wf_yaml_bytes)
|
||||
else:
|
||||
workflow_file.write_bytes(raw_response)
|
||||
except typer.Exit:
|
||||
raise
|
||||
except Exception as exc:
|
||||
if workflow_dir.exists():
|
||||
import shutil
|
||||
|
||||
@@ -9,6 +9,8 @@ without bloating the core framework.
|
||||
import json
|
||||
import hashlib
|
||||
import os
|
||||
import sys
|
||||
import tarfile
|
||||
import tempfile
|
||||
import zipfile
|
||||
import shutil
|
||||
@@ -106,6 +108,137 @@ def normalize_priority(value: Any, default: int = 10) -> int:
|
||||
return priority if priority >= 1 else default
|
||||
|
||||
|
||||
def detect_archive_format(url: str, content_type: str = "") -> str:
|
||||
"""Detect archive format from URL path extension or Content-Type header.
|
||||
|
||||
Args:
|
||||
url: URL or file path to inspect.
|
||||
content_type: Optional ``Content-Type`` header value from the HTTP response.
|
||||
|
||||
Returns:
|
||||
``"zip"`` for ZIP archives, ``"tar.gz"`` for gzipped tarballs, or ``""``
|
||||
when the format cannot be determined.
|
||||
"""
|
||||
# Strip query-string / fragment before examining the path extension.
|
||||
url_path = url.split("?")[0].split("#")[0].lower()
|
||||
if url_path.endswith(".zip"):
|
||||
return "zip"
|
||||
if url_path.endswith(".tar.gz") or url_path.endswith(".tgz"):
|
||||
return "tar.gz"
|
||||
|
||||
# Fall back to Content-Type header inspection.
|
||||
ct = content_type.lower()
|
||||
if "application/zip" in ct or "application/x-zip" in ct:
|
||||
return "zip"
|
||||
if any(
|
||||
t in ct
|
||||
for t in (
|
||||
"application/gzip",
|
||||
"application/x-gzip",
|
||||
"application/x-tar+gzip",
|
||||
)
|
||||
):
|
||||
return "tar.gz"
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
def safe_extract_tarball(
|
||||
archive_path: Path,
|
||||
dest_dir: Path,
|
||||
error_class: "type[Exception]" = Exception,
|
||||
) -> None:
|
||||
"""Safely extract a ``.tar.gz`` or ``.tgz`` archive into *dest_dir*.
|
||||
|
||||
All members are validated before extraction to prevent *tar slip*
|
||||
(path traversal) attacks. Symlinks, hard links, and special files
|
||||
(devices, FIFOs, etc.) are rejected.
|
||||
|
||||
On Python 3.12 and later the ``"data"`` extraction filter is applied
|
||||
for an additional layer of OS-level protection. On earlier versions
|
||||
the explicit member list (containing only pre-validated regular files
|
||||
and directories) is passed to ``extractall()`` — since all symlinks are
|
||||
already rejected in the validation phase, no archive-introduced symlink
|
||||
can be followed during extraction.
|
||||
|
||||
Args:
|
||||
archive_path: Path to the ``.tar.gz``/``.tgz`` archive.
|
||||
dest_dir: Destination directory (must already exist).
|
||||
error_class: Exception class to raise on unsafe entries.
|
||||
|
||||
Raises:
|
||||
error_class: If any member is unsafe or the archive cannot be read.
|
||||
"""
|
||||
dest_resolved = dest_dir.resolve()
|
||||
# Tar metadata member types to skip during validation — they carry no
|
||||
# extractable payload and are generated automatically by many common
|
||||
# archiving tools (e.g. PAX headers, GNU longname/longlink entries).
|
||||
# GNUTYPE_SPARSE is intentionally excluded: it carries a real file payload
|
||||
# and tarfile.TarInfo.isreg() returns True for it, so it passes the
|
||||
# regular-file check below and is extracted correctly.
|
||||
_TAR_METADATA_TYPES = (
|
||||
tarfile.XHDTYPE, # PAX extended header
|
||||
tarfile.XGLTYPE, # PAX global extended header
|
||||
tarfile.SOLARIS_XHDTYPE, # Solaris PAX extended header
|
||||
tarfile.GNUTYPE_LONGNAME, # GNU long path name (metadata only)
|
||||
tarfile.GNUTYPE_LONGLINK, # GNU long link name (metadata only)
|
||||
)
|
||||
|
||||
try:
|
||||
with tarfile.open(archive_path, "r:gz") as tf:
|
||||
members = tf.getmembers()
|
||||
safe_members = []
|
||||
|
||||
# Validate every member before extracting anything.
|
||||
for member in members:
|
||||
# Reject absolute paths and any path component that is "..".
|
||||
if os.path.isabs(member.name) or any(
|
||||
part == ".." for part in member.name.replace("\\", "/").split("/")
|
||||
):
|
||||
raise error_class(
|
||||
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
|
||||
)
|
||||
|
||||
# Confirm the resolved path stays inside dest_dir.
|
||||
member_path = (dest_dir / member.name).resolve()
|
||||
try:
|
||||
member_path.relative_to(dest_resolved)
|
||||
except ValueError:
|
||||
raise error_class(
|
||||
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
|
||||
)
|
||||
|
||||
# Skip tar metadata members — they carry no extractable payload.
|
||||
if member.type in _TAR_METADATA_TYPES:
|
||||
continue
|
||||
|
||||
# Reject symlinks and hard links.
|
||||
if member.issym() or member.islnk():
|
||||
raise error_class(
|
||||
f"Symlinks are not allowed in archive: {member.name}"
|
||||
)
|
||||
|
||||
# Reject devices, FIFOs and other special file types.
|
||||
if not (member.isreg() or member.isdir()):
|
||||
raise error_class(
|
||||
f"Non-regular file in archive: {member.name}"
|
||||
)
|
||||
|
||||
safe_members.append(member)
|
||||
|
||||
# Extract — use the "data" filter on Python 3.12+ for extra hardening.
|
||||
# On all versions pass only the pre-validated members so that no
|
||||
# unvetted entry (added concurrently or via a race) slips through.
|
||||
if sys.version_info >= (3, 12):
|
||||
tf.extractall(dest_dir, members=safe_members, filter="data") # type: ignore[call-arg]
|
||||
else:
|
||||
tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above
|
||||
except error_class:
|
||||
raise
|
||||
except (tarfile.TarError, OSError) as e:
|
||||
raise error_class(f"Failed to read archive {archive_path}: {e}") from e
|
||||
|
||||
|
||||
@dataclass
|
||||
class CatalogEntry:
|
||||
"""Represents a single catalog entry in the catalog stack."""
|
||||
@@ -1202,10 +1335,10 @@ class ExtensionManager:
|
||||
speckit_version: str,
|
||||
priority: int = 10,
|
||||
) -> ExtensionManifest:
|
||||
"""Install extension from ZIP file.
|
||||
"""Install extension from a ZIP or ``.tar.gz``/``.tgz`` archive.
|
||||
|
||||
Args:
|
||||
zip_path: Path to extension ZIP file
|
||||
zip_path: Path to the extension archive (ZIP or gzipped tarball).
|
||||
speckit_version: Current spec-kit version
|
||||
priority: Resolution priority (lower = higher precedence, default 10)
|
||||
|
||||
@@ -1213,7 +1346,8 @@ class ExtensionManager:
|
||||
Installed extension manifest
|
||||
|
||||
Raises:
|
||||
ValidationError: If manifest is invalid or priority is invalid
|
||||
ValidationError: If manifest is invalid, the archive is unsafe, or
|
||||
priority is invalid
|
||||
CompatibilityError: If extension is incompatible
|
||||
"""
|
||||
# Validate priority early
|
||||
@@ -1223,21 +1357,27 @@ class ExtensionManager:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
temp_path = Path(tmpdir)
|
||||
|
||||
# Extract ZIP safely (prevent Zip Slip attack)
|
||||
with zipfile.ZipFile(zip_path, 'r') as zf:
|
||||
# Validate all paths first before extracting anything
|
||||
temp_path_resolved = temp_path.resolve()
|
||||
for member in zf.namelist():
|
||||
member_path = (temp_path / member).resolve()
|
||||
# Use is_relative_to for safe path containment check
|
||||
try:
|
||||
member_path.relative_to(temp_path_resolved)
|
||||
except ValueError:
|
||||
raise ValidationError(
|
||||
f"Unsafe path in ZIP archive: {member} (potential path traversal)"
|
||||
)
|
||||
# Only extract after all paths are validated
|
||||
zf.extractall(temp_path)
|
||||
archive_fmt = detect_archive_format(str(zip_path))
|
||||
|
||||
if archive_fmt == "tar.gz":
|
||||
# Extract tarball safely (prevent tar slip attack)
|
||||
safe_extract_tarball(zip_path, temp_path, ValidationError)
|
||||
else:
|
||||
# Extract ZIP safely (prevent Zip Slip attack)
|
||||
with zipfile.ZipFile(zip_path, 'r') as zf:
|
||||
# Validate all paths first before extracting anything
|
||||
temp_path_resolved = temp_path.resolve()
|
||||
for member in zf.namelist():
|
||||
member_path = (temp_path / member).resolve()
|
||||
# Use is_relative_to for safe path containment check
|
||||
try:
|
||||
member_path.relative_to(temp_path_resolved)
|
||||
except ValueError:
|
||||
raise ValidationError(
|
||||
f"Unsafe path in ZIP archive: {member} (potential path traversal)"
|
||||
)
|
||||
# Only extract after all paths are validated
|
||||
zf.extractall(temp_path)
|
||||
|
||||
# Find extension directory (may be nested)
|
||||
extension_dir = temp_path
|
||||
@@ -1251,7 +1391,7 @@ class ExtensionManager:
|
||||
manifest_path = extension_dir / "extension.yml"
|
||||
|
||||
if not manifest_path.exists():
|
||||
raise ValidationError("No extension.yml found in ZIP file")
|
||||
raise ValidationError("No extension.yml found in archive")
|
||||
|
||||
# Install from extracted directory
|
||||
return self.install_from_directory(extension_dir, speckit_version, priority=priority)
|
||||
@@ -1965,14 +2105,18 @@ class ExtensionCatalog:
|
||||
return None
|
||||
|
||||
def download_extension(self, extension_id: str, target_dir: Optional[Path] = None) -> Path:
|
||||
"""Download extension ZIP from catalog.
|
||||
"""Download extension archive from catalog.
|
||||
|
||||
Supports both ZIP (``.zip``) and gzipped tarball (``.tar.gz``/``.tgz``)
|
||||
archives. The format is detected from the download URL's path extension;
|
||||
when ambiguous the ``Content-Type`` header is used as a fallback.
|
||||
|
||||
Args:
|
||||
extension_id: ID of the extension to download
|
||||
target_dir: Directory to save ZIP file (defaults to temp directory)
|
||||
target_dir: Directory to save the archive (defaults to cache directory)
|
||||
|
||||
Returns:
|
||||
Path to downloaded ZIP file
|
||||
Path to downloaded archive file
|
||||
|
||||
Raises:
|
||||
ExtensionError: If extension not found or download fails
|
||||
@@ -2011,21 +2155,60 @@ class ExtensionCatalog:
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
version = ext_info.get("version", "unknown")
|
||||
zip_filename = f"{extension_id}-{version}.zip"
|
||||
zip_path = target_dir / zip_filename
|
||||
|
||||
# Download the ZIP file
|
||||
# Download the archive. Determine the archive format from the
|
||||
# post-redirect URL first (with Content-Type fallback); only use the
|
||||
# original `download_url` as a last hint if the final URL gives no
|
||||
# signal.
|
||||
final_url = download_url
|
||||
archive_fmt = ""
|
||||
try:
|
||||
with self._open_url(download_url, timeout=60) as response:
|
||||
zip_data = response.read()
|
||||
|
||||
zip_path.write_bytes(zip_data)
|
||||
return zip_path
|
||||
final_url = response.geturl()
|
||||
# Re-validate scheme after any redirect to guard against
|
||||
# scheme-downgrade. Validate BEFORE reading the body so a
|
||||
# malicious redirect cannot cause us to fetch the payload
|
||||
# over an insecure scheme.
|
||||
_final_parsed = urlparse(final_url)
|
||||
_final_is_localhost = _final_parsed.hostname in (
|
||||
"localhost",
|
||||
"127.0.0.1",
|
||||
"::1",
|
||||
)
|
||||
if _final_parsed.scheme != "https" and not (
|
||||
_final_parsed.scheme == "http" and _final_is_localhost
|
||||
):
|
||||
raise ExtensionError(
|
||||
f"Extension download URL was redirected to a non-HTTPS URL: {final_url}"
|
||||
)
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
archive_fmt = detect_archive_format(final_url, content_type)
|
||||
if not archive_fmt:
|
||||
archive_fmt = detect_archive_format(download_url)
|
||||
archive_data = response.read()
|
||||
|
||||
except urllib.error.URLError as e:
|
||||
raise ExtensionError(f"Failed to download extension from {download_url}: {e}")
|
||||
except IOError as e:
|
||||
raise ExtensionError(f"Failed to save extension ZIP: {e}")
|
||||
raise ExtensionError(f"Failed to read extension archive from {download_url}: {e}")
|
||||
|
||||
# Choose file extension based on detected format.
|
||||
if not archive_fmt:
|
||||
raise ExtensionError(
|
||||
f"Could not determine archive format for {download_url}. "
|
||||
"Ensure the URL points to a .zip or .tar.gz/.tgz file."
|
||||
)
|
||||
if archive_fmt == "tar.gz":
|
||||
archive_filename = f"{extension_id}-{version}.tar.gz"
|
||||
else:
|
||||
archive_filename = f"{extension_id}-{version}.zip"
|
||||
|
||||
archive_path = target_dir / archive_filename
|
||||
try:
|
||||
archive_path.write_bytes(archive_data)
|
||||
except IOError as e:
|
||||
raise ExtensionError(f"Failed to save extension archive: {e}")
|
||||
return archive_path
|
||||
|
||||
def clear_cache(self):
|
||||
"""Clear the catalog cache (both legacy and URL-hash-based files)."""
|
||||
|
||||
@@ -27,7 +27,7 @@ import yaml
|
||||
from packaging import version as pkg_version
|
||||
from packaging.specifiers import SpecifierSet, InvalidSpecifier
|
||||
|
||||
from .extensions import ExtensionRegistry, normalize_priority
|
||||
from .extensions import ExtensionRegistry, normalize_priority, detect_archive_format, safe_extract_tarball
|
||||
|
||||
|
||||
def _substitute_core_template(
|
||||
@@ -1604,10 +1604,10 @@ class PresetManager:
|
||||
speckit_version: str,
|
||||
priority: int = 10,
|
||||
) -> PresetManifest:
|
||||
"""Install preset from ZIP file.
|
||||
"""Install preset from a ZIP or ``.tar.gz``/``.tgz`` archive.
|
||||
|
||||
Args:
|
||||
zip_path: Path to preset ZIP file
|
||||
zip_path: Path to the preset archive (ZIP or gzipped tarball).
|
||||
speckit_version: Current spec-kit version
|
||||
priority: Resolution priority (lower = higher precedence, default 10)
|
||||
|
||||
@@ -1615,7 +1615,8 @@ class PresetManager:
|
||||
Installed preset manifest
|
||||
|
||||
Raises:
|
||||
PresetValidationError: If manifest is invalid or priority is invalid
|
||||
PresetValidationError: If manifest is invalid, the archive is unsafe,
|
||||
or priority is invalid
|
||||
PresetCompatibilityError: If pack is incompatible
|
||||
"""
|
||||
# Validate priority early
|
||||
@@ -1625,18 +1626,24 @@ class PresetManager:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
temp_path = Path(tmpdir)
|
||||
|
||||
with zipfile.ZipFile(zip_path, 'r') as zf:
|
||||
temp_path_resolved = temp_path.resolve()
|
||||
for member in zf.namelist():
|
||||
member_path = (temp_path / member).resolve()
|
||||
try:
|
||||
member_path.relative_to(temp_path_resolved)
|
||||
except ValueError:
|
||||
raise PresetValidationError(
|
||||
f"Unsafe path in ZIP archive: {member} "
|
||||
"(potential path traversal)"
|
||||
)
|
||||
zf.extractall(temp_path)
|
||||
archive_fmt = detect_archive_format(str(zip_path))
|
||||
|
||||
if archive_fmt == "tar.gz":
|
||||
# Extract tarball safely (prevent tar slip attack)
|
||||
safe_extract_tarball(zip_path, temp_path, PresetValidationError)
|
||||
else:
|
||||
with zipfile.ZipFile(zip_path, 'r') as zf:
|
||||
temp_path_resolved = temp_path.resolve()
|
||||
for member in zf.namelist():
|
||||
member_path = (temp_path / member).resolve()
|
||||
try:
|
||||
member_path.relative_to(temp_path_resolved)
|
||||
except ValueError:
|
||||
raise PresetValidationError(
|
||||
f"Unsafe path in ZIP archive: {member} "
|
||||
"(potential path traversal)"
|
||||
)
|
||||
zf.extractall(temp_path)
|
||||
|
||||
pack_dir = temp_path
|
||||
manifest_path = pack_dir / "preset.yml"
|
||||
@@ -1649,7 +1656,7 @@ class PresetManager:
|
||||
|
||||
if not manifest_path.exists():
|
||||
raise PresetValidationError(
|
||||
"No preset.yml found in ZIP file"
|
||||
"No preset.yml found in archive"
|
||||
)
|
||||
|
||||
return self.install_from_directory(pack_dir, speckit_version, priority)
|
||||
@@ -2242,14 +2249,18 @@ class PresetCatalog:
|
||||
def download_pack(
|
||||
self, pack_id: str, target_dir: Optional[Path] = None
|
||||
) -> Path:
|
||||
"""Download preset ZIP from catalog.
|
||||
"""Download preset archive from catalog.
|
||||
|
||||
Supports both ZIP (``.zip``) and gzipped tarball (``.tar.gz``/``.tgz``)
|
||||
archives. The format is detected from the download URL's path extension;
|
||||
when ambiguous the ``Content-Type`` header is used as a fallback.
|
||||
|
||||
Args:
|
||||
pack_id: ID of the preset to download
|
||||
target_dir: Directory to save ZIP file (defaults to cache directory)
|
||||
target_dir: Directory to save the archive (defaults to cache directory)
|
||||
|
||||
Returns:
|
||||
Path to downloaded ZIP file
|
||||
Path to downloaded archive file
|
||||
|
||||
Raises:
|
||||
PresetError: If pack not found or download fails
|
||||
@@ -2301,22 +2312,61 @@ class PresetCatalog:
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
version = pack_info.get("version", "unknown")
|
||||
zip_filename = f"{pack_id}-{version}.zip"
|
||||
zip_path = target_dir / zip_filename
|
||||
|
||||
# Determine the archive format from the post-redirect URL first
|
||||
# (with Content-Type fallback); only use the original `download_url`
|
||||
# as a last hint if the final URL gives no signal.
|
||||
final_url = download_url
|
||||
archive_fmt = ""
|
||||
try:
|
||||
with self._open_url(download_url, timeout=60) as response:
|
||||
zip_data = response.read()
|
||||
|
||||
zip_path.write_bytes(zip_data)
|
||||
return zip_path
|
||||
final_url = response.geturl()
|
||||
# Re-validate scheme after any redirect to guard against
|
||||
# scheme-downgrade. Validate BEFORE reading the body so a
|
||||
# malicious redirect cannot cause us to fetch the payload
|
||||
# over an insecure scheme.
|
||||
_final_parsed = urlparse(final_url)
|
||||
_final_is_localhost = _final_parsed.hostname in (
|
||||
"localhost",
|
||||
"127.0.0.1",
|
||||
"::1",
|
||||
)
|
||||
if _final_parsed.scheme != "https" and not (
|
||||
_final_parsed.scheme == "http" and _final_is_localhost
|
||||
):
|
||||
raise PresetError(
|
||||
f"Preset download URL was redirected to a non-HTTPS URL: {final_url}"
|
||||
)
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
archive_fmt = detect_archive_format(final_url, content_type)
|
||||
if not archive_fmt:
|
||||
archive_fmt = detect_archive_format(download_url)
|
||||
archive_data = response.read()
|
||||
|
||||
except urllib.error.URLError as e:
|
||||
raise PresetError(
|
||||
f"Failed to download preset from {download_url}: {e}"
|
||||
)
|
||||
except IOError as e:
|
||||
raise PresetError(f"Failed to save preset ZIP: {e}")
|
||||
raise PresetError(f"Failed to read preset archive from {download_url}: {e}")
|
||||
|
||||
# Choose file extension based on detected format.
|
||||
if not archive_fmt:
|
||||
raise PresetError(
|
||||
f"Could not determine archive format for {download_url}. "
|
||||
"Ensure the URL points to a .zip or .tar.gz/.tgz file."
|
||||
)
|
||||
if archive_fmt == "tar.gz":
|
||||
archive_filename = f"{pack_id}-{version}.tar.gz"
|
||||
else:
|
||||
archive_filename = f"{pack_id}-{version}.zip"
|
||||
|
||||
archive_path = target_dir / archive_filename
|
||||
try:
|
||||
archive_path.write_bytes(archive_data)
|
||||
except IOError as e:
|
||||
raise PresetError(f"Failed to save preset archive: {e}")
|
||||
return archive_path
|
||||
|
||||
def clear_cache(self):
|
||||
"""Clear all catalog cache files, including per-URL hashed caches."""
|
||||
|
||||
@@ -178,6 +178,47 @@ class TestNormalizePriority:
|
||||
assert normalize_priority("invalid", default=1) == 1
|
||||
|
||||
|
||||
# ===== detect_archive_format Tests =====
|
||||
|
||||
class TestDetectArchiveFormat:
|
||||
"""Test the detect_archive_format helper."""
|
||||
|
||||
def _fmt(self, url, ct=""):
|
||||
from specify_cli.extensions import detect_archive_format
|
||||
return detect_archive_format(url, ct)
|
||||
|
||||
def test_zip_url_extension(self):
|
||||
assert self._fmt("https://example.com/ext-1.0.0.zip") == "zip"
|
||||
|
||||
def test_tar_gz_url_extension(self):
|
||||
assert self._fmt("https://example.com/ext-1.0.0.tar.gz") == "tar.gz"
|
||||
|
||||
def test_tgz_url_extension(self):
|
||||
assert self._fmt("https://example.com/ext-1.0.0.tgz") == "tar.gz"
|
||||
|
||||
def test_zip_uppercase_url_extension(self):
|
||||
assert self._fmt("https://example.com/ext.ZIP") == "zip"
|
||||
|
||||
def test_tar_gz_with_query_string(self):
|
||||
assert self._fmt("https://example.com/ext.tar.gz?token=abc") == "tar.gz"
|
||||
|
||||
def test_zip_content_type_fallback(self):
|
||||
assert self._fmt("https://example.com/download", "application/zip") == "zip"
|
||||
|
||||
def test_gzip_content_type_fallback(self):
|
||||
assert self._fmt("https://example.com/download", "application/gzip") == "tar.gz"
|
||||
|
||||
def test_x_gzip_content_type_fallback(self):
|
||||
assert self._fmt("https://example.com/download", "application/x-gzip") == "tar.gz"
|
||||
|
||||
def test_unknown_returns_empty_string(self):
|
||||
assert self._fmt("https://example.com/workflow.yml") == ""
|
||||
|
||||
def test_url_extension_takes_precedence_over_content_type(self):
|
||||
# URL says .zip — content-type claiming gzip should not override.
|
||||
assert self._fmt("https://example.com/ext.zip", "application/gzip") == "zip"
|
||||
|
||||
|
||||
# ===== ExtensionManifest Tests =====
|
||||
|
||||
class TestExtensionManifest:
|
||||
@@ -1013,6 +1054,97 @@ class TestExtensionManager:
|
||||
assert backup_file.read_text() == "test: config"
|
||||
|
||||
|
||||
# ===== install_from_zip Tarball Tests =====
|
||||
|
||||
class TestInstallFromTarball:
|
||||
"""Tests for install_from_zip accepting .tar.gz/.tgz archives."""
|
||||
|
||||
def _make_tarball(self, dest: Path, extension_dir: Path, nested: bool = False) -> None:
|
||||
"""Create a minimal .tar.gz archive from *extension_dir*."""
|
||||
import tarfile
|
||||
with tarfile.open(dest, "w:gz") as tf:
|
||||
for file_path in extension_dir.rglob("*"):
|
||||
if file_path.is_file():
|
||||
arcname = file_path.relative_to(extension_dir)
|
||||
if nested:
|
||||
arcname = Path("test-ext-v1.0.0") / arcname
|
||||
tf.add(file_path, arcname=str(arcname))
|
||||
|
||||
def test_install_from_tar_gz(self, extension_dir, project_dir, temp_dir):
|
||||
"""install_from_zip should accept a .tar.gz archive."""
|
||||
archive = temp_dir / "test-ext-1.0.0.tar.gz"
|
||||
self._make_tarball(archive, extension_dir)
|
||||
|
||||
manager = ExtensionManager(project_dir)
|
||||
manifest = manager.install_from_zip(archive, "0.1.0")
|
||||
assert manifest.id == "test-ext"
|
||||
assert manager.registry.is_installed("test-ext")
|
||||
|
||||
def test_install_from_tgz(self, extension_dir, project_dir, temp_dir):
|
||||
"""install_from_zip should accept a .tgz archive."""
|
||||
archive = temp_dir / "test-ext-1.0.0.tgz"
|
||||
self._make_tarball(archive, extension_dir)
|
||||
|
||||
manager = ExtensionManager(project_dir)
|
||||
manifest = manager.install_from_zip(archive, "0.1.0")
|
||||
assert manifest.id == "test-ext"
|
||||
assert manager.registry.is_installed("test-ext")
|
||||
|
||||
def test_install_from_tar_gz_nested(self, extension_dir, project_dir, temp_dir):
|
||||
"""install_from_zip should handle a single nested directory inside the tarball."""
|
||||
archive = temp_dir / "test-ext-nested.tar.gz"
|
||||
self._make_tarball(archive, extension_dir, nested=True)
|
||||
|
||||
manager = ExtensionManager(project_dir)
|
||||
manifest = manager.install_from_zip(archive, "0.1.0")
|
||||
assert manifest.id == "test-ext"
|
||||
assert manager.registry.is_installed("test-ext")
|
||||
|
||||
def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir):
|
||||
"""install_from_zip raises ValidationError when tarball has no extension.yml."""
|
||||
import tarfile
|
||||
import io
|
||||
archive = temp_dir / "bad.tar.gz"
|
||||
with tarfile.open(archive, "w:gz") as tf:
|
||||
data = b"no manifest here"
|
||||
info = tarfile.TarInfo(name="readme.txt")
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
|
||||
manager = ExtensionManager(project_dir)
|
||||
with pytest.raises(ValidationError, match="No extension.yml found"):
|
||||
manager.install_from_zip(archive, "0.1.0")
|
||||
|
||||
def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir):
|
||||
"""install_from_zip must reject tarballs with path traversal entries."""
|
||||
import tarfile
|
||||
import io
|
||||
archive = temp_dir / "evil.tar.gz"
|
||||
with tarfile.open(archive, "w:gz") as tf:
|
||||
info = tarfile.TarInfo(name="../../evil.txt")
|
||||
data = b"evil"
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
|
||||
manager = ExtensionManager(project_dir)
|
||||
with pytest.raises(ValidationError, match="Unsafe path"):
|
||||
manager.install_from_zip(archive, "0.1.0")
|
||||
|
||||
def test_install_from_tar_gz_rejects_symlinks(self, project_dir, temp_dir):
|
||||
"""install_from_zip must reject tarballs containing symlinks."""
|
||||
import tarfile
|
||||
archive = temp_dir / "symlink.tar.gz"
|
||||
with tarfile.open(archive, "w:gz") as tf:
|
||||
info = tarfile.TarInfo(name="link")
|
||||
info.type = tarfile.SYMTYPE
|
||||
info.linkname = "/etc/passwd"
|
||||
tf.addfile(info)
|
||||
|
||||
manager = ExtensionManager(project_dir)
|
||||
with pytest.raises(ValidationError, match="Symlinks"):
|
||||
manager.install_from_zip(archive, "0.1.0")
|
||||
|
||||
|
||||
# ===== CommandRegistrar Tests =====
|
||||
|
||||
class TestCommandRegistrar:
|
||||
@@ -2627,6 +2759,7 @@ class TestExtensionCatalog:
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.read.return_value = zip_bytes
|
||||
mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-ext.zip"
|
||||
mock_response.__enter__ = lambda s: s
|
||||
mock_response.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
@@ -3529,6 +3662,7 @@ class TestDownloadExtensionBundled:
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.read.return_value = b"fake zip data"
|
||||
mock_response.geturl.return_value = "https://example.com/git-2.0.0.zip"
|
||||
mock_response.__enter__ = lambda s: s
|
||||
mock_response.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
|
||||
@@ -649,6 +649,90 @@ class TestPresetManager:
|
||||
with pytest.raises(PresetValidationError, match="No preset.yml found"):
|
||||
manager.install_from_zip(zip_path, "0.1.5")
|
||||
|
||||
def _make_tarball(self, dest, pack_dir, nested=False):
|
||||
import tarfile
|
||||
with tarfile.open(dest, "w:gz") as tf:
|
||||
for file_path in pack_dir.rglob("*"):
|
||||
if file_path.is_file():
|
||||
arcname = file_path.relative_to(pack_dir)
|
||||
if nested:
|
||||
arcname = Path("test-pack-v1.0.0") / arcname
|
||||
tf.add(file_path, arcname=str(arcname))
|
||||
|
||||
def test_install_from_tar_gz(self, project_dir, pack_dir, temp_dir):
|
||||
"""Test installing a preset from a .tar.gz archive."""
|
||||
archive = temp_dir / "test-pack-1.0.tar.gz"
|
||||
self._make_tarball(archive, pack_dir)
|
||||
|
||||
manager = PresetManager(project_dir)
|
||||
manifest = manager.install_from_zip(archive, "0.1.5")
|
||||
assert manifest.id == "test-pack"
|
||||
assert manager.registry.is_installed("test-pack")
|
||||
|
||||
def test_install_from_tgz(self, project_dir, pack_dir, temp_dir):
|
||||
"""Test installing a preset from a .tgz archive."""
|
||||
archive = temp_dir / "test-pack-1.0.tgz"
|
||||
self._make_tarball(archive, pack_dir)
|
||||
|
||||
manager = PresetManager(project_dir)
|
||||
manifest = manager.install_from_zip(archive, "0.1.5")
|
||||
assert manifest.id == "test-pack"
|
||||
assert manager.registry.is_installed("test-pack")
|
||||
|
||||
def test_install_from_tar_gz_nested(self, project_dir, pack_dir, temp_dir):
|
||||
"""Test installing a preset from a .tar.gz archive with a single nested directory."""
|
||||
archive = temp_dir / "test-pack-nested.tar.gz"
|
||||
self._make_tarball(archive, pack_dir, nested=True)
|
||||
|
||||
manager = PresetManager(project_dir)
|
||||
manifest = manager.install_from_zip(archive, "0.1.5")
|
||||
assert manifest.id == "test-pack"
|
||||
assert manager.registry.is_installed("test-pack")
|
||||
|
||||
def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir):
|
||||
"""Test installing a preset from a .tar.gz without preset.yml raises error."""
|
||||
import tarfile
|
||||
import io
|
||||
archive = temp_dir / "bad.tar.gz"
|
||||
with tarfile.open(archive, "w:gz") as tf:
|
||||
data = b"no manifest here"
|
||||
info = tarfile.TarInfo(name="readme.txt")
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
|
||||
manager = PresetManager(project_dir)
|
||||
with pytest.raises(PresetValidationError, match="No preset.yml found"):
|
||||
manager.install_from_zip(archive, "0.1.5")
|
||||
|
||||
def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir):
|
||||
"""install_from_zip must reject tarballs with path traversal entries."""
|
||||
import tarfile
|
||||
import io
|
||||
archive = temp_dir / "evil.tar.gz"
|
||||
with tarfile.open(archive, "w:gz") as tf:
|
||||
info = tarfile.TarInfo(name="../../evil.txt")
|
||||
data = b"evil"
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
|
||||
manager = PresetManager(project_dir)
|
||||
with pytest.raises(PresetValidationError, match="Unsafe path"):
|
||||
manager.install_from_zip(archive, "0.1.5")
|
||||
|
||||
def test_install_from_tar_gz_rejects_symlinks(self, project_dir, temp_dir):
|
||||
"""install_from_zip must reject tarballs containing symlinks."""
|
||||
import tarfile
|
||||
archive = temp_dir / "symlink.tar.gz"
|
||||
with tarfile.open(archive, "w:gz") as tf:
|
||||
info = tarfile.TarInfo(name="link")
|
||||
info.type = tarfile.SYMTYPE
|
||||
info.linkname = "/etc/passwd"
|
||||
tf.addfile(info)
|
||||
|
||||
manager = PresetManager(project_dir)
|
||||
with pytest.raises(PresetValidationError, match="Symlinks"):
|
||||
manager.install_from_zip(archive, "0.1.5")
|
||||
|
||||
def test_remove(self, project_dir, pack_dir):
|
||||
"""Test removing a preset."""
|
||||
manager = PresetManager(project_dir)
|
||||
@@ -1529,6 +1613,7 @@ class TestPresetCatalog:
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.read.return_value = zip_bytes
|
||||
mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-pack.zip"
|
||||
mock_response.__enter__ = lambda s: s
|
||||
mock_response.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
|
||||
@@ -1843,3 +1843,230 @@ steps:
|
||||
assert state.status == RunStatus.COMPLETED
|
||||
assert "do-plan" in state.step_results
|
||||
assert "do-specify" not in state.step_results
|
||||
|
||||
|
||||
# ===== workflow add archive CLI tests =====
|
||||
|
||||
MINIMAL_WORKFLOW_YAML = """\
|
||||
schema_version: "1.0"
|
||||
workflow:
|
||||
id: "arc-workflow"
|
||||
name: "Archive Workflow"
|
||||
version: "1.0.0"
|
||||
description: "Installed from archive"
|
||||
steps:
|
||||
- id: step-one
|
||||
type: shell
|
||||
run: "echo hello"
|
||||
"""
|
||||
|
||||
|
||||
class TestWorkflowAddArchive:
|
||||
"""CLI-level tests for `workflow add` with local archive files."""
|
||||
|
||||
@pytest.fixture
|
||||
def project_dir(self, tmp_path):
|
||||
"""Create a minimal spec-kit project."""
|
||||
specify = tmp_path / ".specify"
|
||||
specify.mkdir()
|
||||
(specify / "workflows").mkdir()
|
||||
return tmp_path
|
||||
|
||||
def _runner_and_app(self):
|
||||
from typer.testing import CliRunner
|
||||
from specify_cli import app
|
||||
return CliRunner(), app
|
||||
|
||||
# -- Local ZIP archive --------------------------------------------------
|
||||
|
||||
def test_workflow_add_local_zip_flat(self, project_dir):
|
||||
"""workflow add installs from a local ZIP with workflow.yml at root."""
|
||||
import zipfile
|
||||
from unittest.mock import patch
|
||||
runner, app = self._runner_and_app()
|
||||
|
||||
archive = project_dir / "workflow.zip"
|
||||
with zipfile.ZipFile(archive, "w") as zf:
|
||||
zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML)
|
||||
|
||||
with patch.object(Path, "cwd", return_value=project_dir):
|
||||
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert "arc-workflow" in result.output
|
||||
installed = project_dir / ".specify" / "workflows" / "arc-workflow" / "workflow.yml"
|
||||
assert installed.exists()
|
||||
|
||||
def test_workflow_add_local_zip_nested(self, project_dir):
|
||||
"""workflow add installs from a local ZIP with workflow.yml in a subdirectory."""
|
||||
import zipfile
|
||||
from unittest.mock import patch
|
||||
runner, app = self._runner_and_app()
|
||||
|
||||
archive = project_dir / "workflow.zip"
|
||||
with zipfile.ZipFile(archive, "w") as zf:
|
||||
zf.writestr("repo-1.0/workflow.yml", MINIMAL_WORKFLOW_YAML)
|
||||
|
||||
with patch.object(Path, "cwd", return_value=project_dir):
|
||||
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert "arc-workflow" in result.output
|
||||
|
||||
def test_workflow_add_local_zip_missing_workflow_yml(self, project_dir):
|
||||
"""workflow add exits with an error when the ZIP has no workflow.yml."""
|
||||
import zipfile
|
||||
from unittest.mock import patch
|
||||
runner, app = self._runner_and_app()
|
||||
|
||||
archive = project_dir / "empty.zip"
|
||||
with zipfile.ZipFile(archive, "w") as zf:
|
||||
zf.writestr("README.md", "nothing here")
|
||||
|
||||
with patch.object(Path, "cwd", return_value=project_dir):
|
||||
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True)
|
||||
|
||||
assert result.exit_code != 0
|
||||
assert "extract" in result.output.lower() or "workflow" in result.output.lower()
|
||||
|
||||
# -- Local tar.gz archive -----------------------------------------------
|
||||
|
||||
def test_workflow_add_local_tar_gz_flat(self, project_dir):
|
||||
"""workflow add installs from a local .tar.gz with workflow.yml at root."""
|
||||
import tarfile, io
|
||||
from unittest.mock import patch
|
||||
runner, app = self._runner_and_app()
|
||||
|
||||
archive = project_dir / "workflow.tar.gz"
|
||||
with tarfile.open(archive, "w:gz") as tf:
|
||||
data = MINIMAL_WORKFLOW_YAML.encode()
|
||||
info = tarfile.TarInfo(name="workflow.yml")
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
|
||||
with patch.object(Path, "cwd", return_value=project_dir):
|
||||
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert "arc-workflow" in result.output
|
||||
installed = project_dir / ".specify" / "workflows" / "arc-workflow" / "workflow.yml"
|
||||
assert installed.exists()
|
||||
|
||||
def test_workflow_add_local_tar_gz_nested(self, project_dir):
|
||||
"""workflow add installs from a local .tar.gz with workflow.yml in a subdirectory."""
|
||||
import tarfile, io
|
||||
from unittest.mock import patch
|
||||
runner, app = self._runner_and_app()
|
||||
|
||||
archive = project_dir / "workflow.tar.gz"
|
||||
with tarfile.open(archive, "w:gz") as tf:
|
||||
data = MINIMAL_WORKFLOW_YAML.encode()
|
||||
info = tarfile.TarInfo(name="repo-1.0/workflow.yml")
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
|
||||
with patch.object(Path, "cwd", return_value=project_dir):
|
||||
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert "arc-workflow" in result.output
|
||||
|
||||
def test_workflow_add_local_tgz_flat(self, project_dir):
|
||||
"""workflow add recognises the .tgz extension as a gzipped tarball."""
|
||||
import tarfile, io
|
||||
from unittest.mock import patch
|
||||
runner, app = self._runner_and_app()
|
||||
|
||||
archive = project_dir / "workflow.tgz"
|
||||
with tarfile.open(archive, "w:gz") as tf:
|
||||
data = MINIMAL_WORKFLOW_YAML.encode()
|
||||
info = tarfile.TarInfo(name="workflow.yml")
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
|
||||
with patch.object(Path, "cwd", return_value=project_dir):
|
||||
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert "arc-workflow" in result.output
|
||||
|
||||
def test_workflow_add_local_tar_gz_missing_workflow_yml(self, project_dir):
|
||||
"""workflow add exits with an error when the .tar.gz has no workflow.yml."""
|
||||
import tarfile, io
|
||||
from unittest.mock import patch
|
||||
runner, app = self._runner_and_app()
|
||||
|
||||
archive = project_dir / "empty.tar.gz"
|
||||
with tarfile.open(archive, "w:gz") as tf:
|
||||
data = b"nothing"
|
||||
info = tarfile.TarInfo(name="README.md")
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
|
||||
with patch.object(Path, "cwd", return_value=project_dir):
|
||||
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True)
|
||||
|
||||
assert result.exit_code != 0
|
||||
assert "extract" in result.output.lower() or "workflow" in result.output.lower()
|
||||
|
||||
# -- URL archive download -----------------------------------------------
|
||||
|
||||
def test_workflow_add_url_tar_gz(self, project_dir):
|
||||
"""workflow add downloads a .tar.gz from a URL and installs the workflow."""
|
||||
import tarfile, io
|
||||
from unittest.mock import patch, MagicMock
|
||||
runner, app = self._runner_and_app()
|
||||
|
||||
# Build an in-memory tar.gz archive containing workflow.yml.
|
||||
buf = io.BytesIO()
|
||||
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
|
||||
data = MINIMAL_WORKFLOW_YAML.encode()
|
||||
info = tarfile.TarInfo(name="workflow.yml")
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
raw_bytes = buf.getvalue()
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.geturl.return_value = "https://example.com/workflow.tar.gz"
|
||||
mock_resp.headers.get.return_value = "application/gzip"
|
||||
mock_resp.read.return_value = raw_bytes
|
||||
mock_resp.__enter__ = lambda s: s
|
||||
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=mock_resp), \
|
||||
patch.object(Path, "cwd", return_value=project_dir):
|
||||
result = runner.invoke(
|
||||
app, ["workflow", "add", "https://example.com/workflow.tar.gz"],
|
||||
catch_exceptions=False,
|
||||
)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert "arc-workflow" in result.output
|
||||
|
||||
def test_workflow_add_url_zip(self, project_dir):
|
||||
"""workflow add downloads a .zip from a URL and installs the workflow."""
|
||||
import zipfile, io
|
||||
from unittest.mock import patch, MagicMock
|
||||
runner, app = self._runner_and_app()
|
||||
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, "w") as zf:
|
||||
zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML)
|
||||
raw_bytes = buf.getvalue()
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.geturl.return_value = "https://example.com/workflow.zip"
|
||||
mock_resp.headers.get.return_value = "application/zip"
|
||||
mock_resp.read.return_value = raw_bytes
|
||||
mock_resp.__enter__ = lambda s: s
|
||||
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
with patch("urllib.request.urlopen", return_value=mock_resp), \
|
||||
patch.object(Path, "cwd", return_value=project_dir):
|
||||
result = runner.invoke(
|
||||
app, ["workflow", "add", "https://example.com/workflow.zip"],
|
||||
catch_exceptions=False,
|
||||
)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert "arc-workflow" in result.output
|
||||
|
||||
Reference in New Issue
Block a user