mirror of
https://github.com/github/spec-kit.git
synced 2026-07-05 13:34:06 +08:00
Compare commits
23 Commits
copilot/fi
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d75366fd5 | ||
|
|
7344071b7e | ||
|
|
a69d427e03 | ||
|
|
a8320d9b61 | ||
|
|
0825f508a7 | ||
|
|
eec1291896 | ||
|
|
7ff9c8bdd4 | ||
|
|
1015ff24da | ||
|
|
05798a9e70 | ||
|
|
bd04937927 | ||
|
|
1786d27c06 | ||
|
|
0a02369ebe | ||
|
|
e0495ebc38 | ||
|
|
cb87a410f8 | ||
|
|
0fd0bf6b9f | ||
|
|
d00509e770 | ||
|
|
c44cc245ed | ||
|
|
0c6cc4502c | ||
|
|
d78ead1802 | ||
|
|
b3a60f5fba | ||
|
|
b37f117cf9 | ||
|
|
a434e5a8ed | ||
|
|
1bda2f0cb4 |
@@ -2576,7 +2576,7 @@ def preset_list():
|
|||||||
@preset_app.command("add")
|
@preset_app.command("add")
|
||||||
def preset_add(
|
def preset_add(
|
||||||
preset_id: str = typer.Argument(None, help="Preset ID to install from catalog"),
|
preset_id: str = typer.Argument(None, help="Preset ID to install from catalog"),
|
||||||
from_url: str = typer.Option(None, "--from", help="Install from a URL (ZIP file)"),
|
from_url: str = typer.Option(None, "--from", help="Install from a URL (ZIP or .tar.gz/.tgz archive)"),
|
||||||
dev: str = typer.Option(None, "--dev", help="Install from local directory (development mode)"),
|
dev: str = typer.Option(None, "--dev", help="Install from local directory (development mode)"),
|
||||||
priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"),
|
priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"),
|
||||||
):
|
):
|
||||||
@@ -2629,17 +2629,46 @@ def preset_add(
|
|||||||
import urllib.request
|
import urllib.request
|
||||||
import urllib.error
|
import urllib.error
|
||||||
import tempfile
|
import tempfile
|
||||||
|
from .extensions import detect_archive_format as _det_fmt
|
||||||
|
|
||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
zip_path = Path(tmpdir) / "preset.zip"
|
final_url = from_url
|
||||||
|
archive_fmt = ""
|
||||||
try:
|
try:
|
||||||
with urllib.request.urlopen(from_url, timeout=60) as response:
|
with urllib.request.urlopen(from_url, timeout=60) as response:
|
||||||
zip_path.write_bytes(response.read())
|
final_url = response.geturl()
|
||||||
|
# Re-validate scheme after any redirect (scheme-downgrade
|
||||||
|
# guard). Check BEFORE reading the body so an insecure
|
||||||
|
# redirect cannot cause us to fetch the payload.
|
||||||
|
_fp = _urlparse(final_url)
|
||||||
|
_fl = _fp.hostname in ("localhost", "127.0.0.1", "::1")
|
||||||
|
if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl):
|
||||||
|
console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
content_type = response.headers.get("Content-Type", "")
|
||||||
|
# Prefer the post-redirect URL for format detection;
|
||||||
|
# fall back to the original URL only as a last hint.
|
||||||
|
archive_fmt = _det_fmt(final_url, content_type)
|
||||||
|
if not archive_fmt:
|
||||||
|
archive_fmt = _det_fmt(from_url)
|
||||||
|
archive_data = response.read()
|
||||||
except urllib.error.URLError as e:
|
except urllib.error.URLError as e:
|
||||||
console.print(f"[red]Error:[/red] Failed to download: {e}")
|
console.print(f"[red]Error:[/red] Failed to download: {e}")
|
||||||
raise typer.Exit(1)
|
raise typer.Exit(1)
|
||||||
|
|
||||||
manifest = manager.install_from_zip(zip_path, speckit_version, priority)
|
if not archive_fmt:
|
||||||
|
console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.")
|
||||||
|
console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
|
||||||
|
archive_path = Path(tmpdir) / f"preset{suffix}"
|
||||||
|
try:
|
||||||
|
archive_path.write_bytes(archive_data)
|
||||||
|
manifest = manager.install_from_zip(archive_path, speckit_version, priority)
|
||||||
|
except OSError as e:
|
||||||
|
console.print(f"[red]Error:[/red] Failed to save or install archive: {e}")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
console.print(f"[green]✓[/green] Preset '{manifest.name}' v{manifest.version} installed (priority {priority})")
|
console.print(f"[green]✓[/green] Preset '{manifest.name}' v{manifest.version} installed (priority {priority})")
|
||||||
|
|
||||||
@@ -3573,7 +3602,7 @@ def catalog_remove(
|
|||||||
def extension_add(
|
def extension_add(
|
||||||
extension: str = typer.Argument(help="Extension name or path"),
|
extension: str = typer.Argument(help="Extension name or path"),
|
||||||
dev: bool = typer.Option(False, "--dev", help="Install from local directory"),
|
dev: bool = typer.Option(False, "--dev", help="Install from local directory"),
|
||||||
from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL"),
|
from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL (ZIP or .tar.gz/.tgz archive)"),
|
||||||
priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"),
|
priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"),
|
||||||
):
|
):
|
||||||
"""Install an extension."""
|
"""Install an extension."""
|
||||||
@@ -3612,10 +3641,11 @@ def extension_add(
|
|||||||
manifest = manager.install_from_directory(source_path, speckit_version, priority=priority)
|
manifest = manager.install_from_directory(source_path, speckit_version, priority=priority)
|
||||||
|
|
||||||
elif from_url:
|
elif from_url:
|
||||||
# Install from URL (ZIP file)
|
# Install from URL (ZIP or tar.gz archive)
|
||||||
import urllib.request
|
import urllib.request
|
||||||
import urllib.error
|
import urllib.error
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
from .extensions import detect_archive_format
|
||||||
|
|
||||||
# Validate URL
|
# Validate URL
|
||||||
parsed = urlparse(from_url)
|
parsed = urlparse(from_url)
|
||||||
@@ -3631,25 +3661,53 @@ def extension_add(
|
|||||||
console.print("Only install extensions from sources you trust.\n")
|
console.print("Only install extensions from sources you trust.\n")
|
||||||
console.print(f"Downloading from {from_url}...")
|
console.print(f"Downloading from {from_url}...")
|
||||||
|
|
||||||
# Download ZIP to temp location
|
# Download archive to temp location; detect format from the
|
||||||
|
# post-redirect URL (with Content-Type fallback), only using
|
||||||
|
# the original URL as a last hint.
|
||||||
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
|
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
|
||||||
download_dir.mkdir(parents=True, exist_ok=True)
|
download_dir.mkdir(parents=True, exist_ok=True)
|
||||||
zip_path = download_dir / f"{extension}-url-download.zip"
|
archive_fmt = ""
|
||||||
|
archive_path = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with urllib.request.urlopen(from_url, timeout=60) as response:
|
with urllib.request.urlopen(from_url, timeout=60) as response:
|
||||||
zip_data = response.read()
|
final_url = response.geturl()
|
||||||
zip_path.write_bytes(zip_data)
|
# Re-validate scheme after any redirect (scheme-downgrade
|
||||||
|
# guard). Check BEFORE reading the body so an insecure
|
||||||
|
# redirect cannot cause us to fetch the payload.
|
||||||
|
_fp = urlparse(final_url)
|
||||||
|
_fl = _fp.hostname in ("localhost", "127.0.0.1", "::1")
|
||||||
|
if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl):
|
||||||
|
console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
content_type = response.headers.get("Content-Type", "")
|
||||||
|
archive_fmt = detect_archive_format(final_url, content_type)
|
||||||
|
if not archive_fmt:
|
||||||
|
archive_fmt = detect_archive_format(from_url)
|
||||||
|
archive_data = response.read()
|
||||||
|
|
||||||
# Install from downloaded ZIP
|
if not archive_fmt:
|
||||||
manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority)
|
console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.")
|
||||||
|
console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
|
||||||
|
safe_name = Path(extension).name or "extension"
|
||||||
|
archive_path = download_dir / f"{safe_name}-url-download{suffix}"
|
||||||
|
archive_path.write_bytes(archive_data)
|
||||||
|
|
||||||
|
# Install from downloaded archive
|
||||||
|
manifest = manager.install_from_zip(archive_path, speckit_version, priority=priority)
|
||||||
except urllib.error.URLError as e:
|
except urllib.error.URLError as e:
|
||||||
console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}")
|
console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}")
|
||||||
raise typer.Exit(1)
|
raise typer.Exit(1)
|
||||||
|
except OSError as e:
|
||||||
|
console.print(f"[red]Error:[/red] Failed to save or install archive: {e}")
|
||||||
|
raise typer.Exit(1)
|
||||||
finally:
|
finally:
|
||||||
# Clean up downloaded ZIP
|
# Clean up the downloaded archive
|
||||||
if zip_path.exists():
|
if archive_path is not None and archive_path.exists():
|
||||||
zip_path.unlink()
|
archive_path.unlink()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Try bundled extensions first (shipped with spec-kit)
|
# Try bundled extensions first (shipped with spec-kit)
|
||||||
@@ -4301,29 +4359,55 @@ def extension_update(
|
|||||||
backup_hooks[hook_name] = ext_hooks
|
backup_hooks[hook_name] = ext_hooks
|
||||||
|
|
||||||
# 5. Download new version
|
# 5. Download new version
|
||||||
zip_path = catalog.download_extension(extension_id)
|
archive_path = catalog.download_extension(extension_id)
|
||||||
try:
|
try:
|
||||||
# 6. Validate extension ID from ZIP BEFORE modifying installation
|
# 6. Validate extension ID from archive BEFORE modifying installation
|
||||||
# Handle both root-level and nested extension.yml (GitHub auto-generated ZIPs)
|
# Handle both root-level and nested extension.yml (GitHub auto-generated archives)
|
||||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
from .extensions import detect_archive_format
|
||||||
import yaml
|
import tarfile
|
||||||
manifest_data = None
|
archive_fmt = detect_archive_format(str(archive_path))
|
||||||
namelist = zf.namelist()
|
import yaml
|
||||||
|
manifest_data = None
|
||||||
|
|
||||||
# First try root-level extension.yml
|
if archive_fmt == "tar.gz":
|
||||||
if "extension.yml" in namelist:
|
with tarfile.open(archive_path, "r:gz") as tf:
|
||||||
with zf.open("extension.yml") as f:
|
# First try root-level extension.yml
|
||||||
manifest_data = yaml.safe_load(f) or {}
|
try:
|
||||||
else:
|
m = tf.getmember("extension.yml")
|
||||||
# Look for extension.yml in a single top-level subdirectory
|
f = tf.extractfile(m)
|
||||||
# (e.g., "repo-name-branch/extension.yml")
|
if f is not None:
|
||||||
manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1]
|
with f:
|
||||||
if len(manifest_paths) == 1:
|
manifest_data = yaml.safe_load(f.read()) or {}
|
||||||
with zf.open(manifest_paths[0]) as f:
|
except KeyError:
|
||||||
|
# extension.yml not present at archive root; use nested fallback below.
|
||||||
|
manifest_data = None
|
||||||
|
# Fall back to nested-directory search if root-level
|
||||||
|
# was missing (KeyError) or not a regular file (None).
|
||||||
|
if manifest_data is None:
|
||||||
|
members = [m for m in tf.getmembers() if m.name.endswith("/extension.yml") and m.name.count("/") == 1]
|
||||||
|
if len(members) == 1:
|
||||||
|
f = tf.extractfile(members[0])
|
||||||
|
if f is not None:
|
||||||
|
with f:
|
||||||
|
manifest_data = yaml.safe_load(f.read()) or {}
|
||||||
|
else:
|
||||||
|
with zipfile.ZipFile(archive_path, "r") as zf:
|
||||||
|
namelist = zf.namelist()
|
||||||
|
|
||||||
|
# First try root-level extension.yml
|
||||||
|
if "extension.yml" in namelist:
|
||||||
|
with zf.open("extension.yml") as f:
|
||||||
manifest_data = yaml.safe_load(f) or {}
|
manifest_data = yaml.safe_load(f) or {}
|
||||||
|
else:
|
||||||
|
# Look for extension.yml in a single top-level subdirectory
|
||||||
|
# (e.g., "repo-name-branch/extension.yml")
|
||||||
|
manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1]
|
||||||
|
if len(manifest_paths) == 1:
|
||||||
|
with zf.open(manifest_paths[0]) as f:
|
||||||
|
manifest_data = yaml.safe_load(f) or {}
|
||||||
|
|
||||||
if manifest_data is None:
|
if manifest_data is None:
|
||||||
raise ValueError("Downloaded extension archive is missing 'extension.yml'")
|
raise ValueError("Downloaded extension archive is missing 'extension.yml'")
|
||||||
|
|
||||||
zip_extension_id = manifest_data.get("extension", {}).get("id")
|
zip_extension_id = manifest_data.get("extension", {}).get("id")
|
||||||
if zip_extension_id != extension_id:
|
if zip_extension_id != extension_id:
|
||||||
@@ -4335,7 +4419,7 @@ def extension_update(
|
|||||||
manager.remove(extension_id, keep_config=True)
|
manager.remove(extension_id, keep_config=True)
|
||||||
|
|
||||||
# 8. Install new version
|
# 8. Install new version
|
||||||
_ = manager.install_from_zip(zip_path, speckit_version)
|
_ = manager.install_from_zip(archive_path, speckit_version)
|
||||||
|
|
||||||
# Restore user config files from backup after successful install.
|
# Restore user config files from backup after successful install.
|
||||||
new_extension_dir = manager.extensions_dir / extension_id
|
new_extension_dir = manager.extensions_dir / extension_id
|
||||||
@@ -4381,9 +4465,9 @@ def extension_update(
|
|||||||
hook["enabled"] = False
|
hook["enabled"] = False
|
||||||
hook_executor.save_project_config(config)
|
hook_executor.save_project_config(config)
|
||||||
finally:
|
finally:
|
||||||
# Clean up downloaded ZIP
|
# Clean up downloaded archive
|
||||||
if zip_path.exists():
|
if archive_path.exists():
|
||||||
zip_path.unlink()
|
archive_path.unlink()
|
||||||
|
|
||||||
# 10. Clean up backup on success
|
# 10. Clean up backup on success
|
||||||
if backup_base.exists():
|
if backup_base.exists():
|
||||||
@@ -4875,6 +4959,59 @@ def workflow_list():
|
|||||||
console.print()
|
console.print()
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes:
|
||||||
|
"""Extract ``workflow.yml`` from a ZIP or ``.tar.gz`` archive.
|
||||||
|
|
||||||
|
Searches the archive root and a single nested top-level subdirectory
|
||||||
|
(e.g., ``repo-name-1.0/workflow.yml``).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
archive_path: Path to the downloaded archive.
|
||||||
|
archive_fmt: ``"zip"`` or ``"tar.gz"``.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Raw bytes of the ``workflow.yml`` file.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If no ``workflow.yml`` is found in the archive.
|
||||||
|
"""
|
||||||
|
import tarfile
|
||||||
|
|
||||||
|
if archive_fmt == "tar.gz":
|
||||||
|
with tarfile.open(archive_path, "r:gz") as tf:
|
||||||
|
# Try root-level first.
|
||||||
|
try:
|
||||||
|
f = tf.extractfile(tf.getmember("workflow.yml"))
|
||||||
|
if f is not None:
|
||||||
|
with f:
|
||||||
|
return f.read()
|
||||||
|
except KeyError:
|
||||||
|
pass # Root-level workflow.yml not found; fall through to subdirectory search below.
|
||||||
|
# Look in a single top-level subdirectory.
|
||||||
|
candidates = [
|
||||||
|
m for m in tf.getmembers()
|
||||||
|
if m.name.endswith("/workflow.yml") and m.name.count("/") == 1
|
||||||
|
]
|
||||||
|
if len(candidates) == 1:
|
||||||
|
f = tf.extractfile(candidates[0])
|
||||||
|
if f is not None:
|
||||||
|
with f:
|
||||||
|
return f.read()
|
||||||
|
else:
|
||||||
|
with zipfile.ZipFile(archive_path, "r") as zf:
|
||||||
|
namelist = zf.namelist()
|
||||||
|
if "workflow.yml" in namelist:
|
||||||
|
return zf.read("workflow.yml")
|
||||||
|
candidates = [
|
||||||
|
n for n in namelist
|
||||||
|
if n.endswith("/workflow.yml") and n.count("/") == 1
|
||||||
|
]
|
||||||
|
if len(candidates) == 1:
|
||||||
|
return zf.read(candidates[0])
|
||||||
|
|
||||||
|
raise ValueError("No workflow.yml found in the downloaded archive")
|
||||||
|
|
||||||
|
|
||||||
@workflow_app.command("add")
|
@workflow_app.command("add")
|
||||||
def workflow_add(
|
def workflow_add(
|
||||||
source: str = typer.Argument(..., help="Workflow ID, URL, or local path"),
|
source: str = typer.Argument(..., help="Workflow ID, URL, or local path"),
|
||||||
@@ -4928,6 +5065,7 @@ def workflow_add(
|
|||||||
from ipaddress import ip_address
|
from ipaddress import ip_address
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
from urllib.request import urlopen # noqa: S310
|
from urllib.request import urlopen # noqa: S310
|
||||||
|
from .extensions import detect_archive_format
|
||||||
|
|
||||||
parsed_src = urlparse(source)
|
parsed_src = urlparse(source)
|
||||||
src_host = parsed_src.hostname or ""
|
src_host = parsed_src.hostname or ""
|
||||||
@@ -4958,18 +5096,53 @@ def workflow_add(
|
|||||||
if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb):
|
if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb):
|
||||||
console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}")
|
console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}")
|
||||||
raise typer.Exit(1)
|
raise typer.Exit(1)
|
||||||
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
|
|
||||||
tmp.write(resp.read())
|
# Detect archive format from the final URL or Content-Type header.
|
||||||
tmp_path = Path(tmp.name)
|
archive_fmt = detect_archive_format(final_url)
|
||||||
|
if not archive_fmt:
|
||||||
|
content_type = resp.headers.get("Content-Type", "")
|
||||||
|
archive_fmt = detect_archive_format(final_url, content_type)
|
||||||
|
|
||||||
|
raw_data = resp.read()
|
||||||
except typer.Exit:
|
except typer.Exit:
|
||||||
raise
|
raise
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
console.print(f"[red]Error:[/red] Failed to download workflow: {exc}")
|
console.print(f"[red]Error:[/red] Failed to download workflow: {exc}")
|
||||||
raise typer.Exit(1)
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
tmp_path = None
|
||||||
|
try:
|
||||||
|
if archive_fmt in ("tar.gz", "zip"):
|
||||||
|
# Extract workflow.yml from the archive.
|
||||||
|
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
|
||||||
|
arc_tmp_path = None
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_tmp:
|
||||||
|
arc_tmp_path = Path(arc_tmp.name)
|
||||||
|
arc_tmp.write(raw_data)
|
||||||
|
try:
|
||||||
|
wf_yaml = _extract_workflow_yml(arc_tmp_path, archive_fmt)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
|
||||||
|
tmp_path = Path(tmp.name)
|
||||||
|
tmp.write(wf_yaml)
|
||||||
|
finally:
|
||||||
|
if arc_tmp_path is not None:
|
||||||
|
arc_tmp_path.unlink(missing_ok=True)
|
||||||
|
else:
|
||||||
|
# Treat as a plain YAML file (existing behaviour).
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
|
||||||
|
tmp.write(raw_data)
|
||||||
|
tmp_path = Path(tmp.name)
|
||||||
|
except typer.Exit:
|
||||||
|
raise
|
||||||
|
except Exception as exc:
|
||||||
|
console.print(f"[red]Error:[/red] Failed to process downloaded workflow: {exc}")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_validate_and_install_local(tmp_path, source)
|
_validate_and_install_local(tmp_path, source)
|
||||||
finally:
|
finally:
|
||||||
tmp_path.unlink(missing_ok=True)
|
if tmp_path is not None:
|
||||||
|
tmp_path.unlink(missing_ok=True)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Try as a local file/directory
|
# Try as a local file/directory
|
||||||
@@ -4978,6 +5151,27 @@ def workflow_add(
|
|||||||
if source_path.is_file() and source_path.suffix in (".yml", ".yaml"):
|
if source_path.is_file() and source_path.suffix in (".yml", ".yaml"):
|
||||||
_validate_and_install_local(source_path, str(source_path))
|
_validate_and_install_local(source_path, str(source_path))
|
||||||
return
|
return
|
||||||
|
elif source_path.is_file() and (
|
||||||
|
source.lower().endswith(".tar.gz") or source.lower().endswith(".tgz") or source.lower().endswith(".zip")
|
||||||
|
):
|
||||||
|
# Local archive file containing workflow.yml
|
||||||
|
from .extensions import detect_archive_format
|
||||||
|
local_fmt = detect_archive_format(source)
|
||||||
|
try:
|
||||||
|
wf_yaml = _extract_workflow_yml(source_path, local_fmt)
|
||||||
|
except Exception as exc:
|
||||||
|
console.print(f"[red]Error:[/red] Failed to extract workflow from archive: {exc}")
|
||||||
|
raise typer.Exit(1)
|
||||||
|
import tempfile
|
||||||
|
tmp_local = None
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
|
||||||
|
tmp_local = Path(tmp.name)
|
||||||
|
tmp.write(wf_yaml)
|
||||||
|
try:
|
||||||
|
_validate_and_install_local(tmp_local, str(source_path))
|
||||||
|
finally:
|
||||||
|
tmp_local.unlink(missing_ok=True)
|
||||||
|
return
|
||||||
elif source_path.is_dir():
|
elif source_path.is_dir():
|
||||||
wf_file = source_path / "workflow.yml"
|
wf_file = source_path / "workflow.yml"
|
||||||
if not wf_file.exists():
|
if not wf_file.exists():
|
||||||
@@ -5041,6 +5235,7 @@ def workflow_add(
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
from urllib.request import urlopen # noqa: S310 — URL comes from catalog
|
from urllib.request import urlopen # noqa: S310 — URL comes from catalog
|
||||||
|
from .extensions import detect_archive_format
|
||||||
|
|
||||||
workflow_dir.mkdir(parents=True, exist_ok=True)
|
workflow_dir.mkdir(parents=True, exist_ok=True)
|
||||||
with urlopen(workflow_url, timeout=30) as response: # noqa: S310
|
with urlopen(workflow_url, timeout=30) as response: # noqa: S310
|
||||||
@@ -5063,7 +5258,32 @@ def workflow_add(
|
|||||||
f"[red]Error:[/red] Workflow '{source}' redirected to non-HTTPS URL: {final_url}"
|
f"[red]Error:[/red] Workflow '{source}' redirected to non-HTTPS URL: {final_url}"
|
||||||
)
|
)
|
||||||
raise typer.Exit(1)
|
raise typer.Exit(1)
|
||||||
workflow_file.write_bytes(response.read())
|
|
||||||
|
# Detect archive format from the final URL or Content-Type header.
|
||||||
|
cat_archive_fmt = detect_archive_format(final_url)
|
||||||
|
if not cat_archive_fmt:
|
||||||
|
cat_ct = response.headers.get("Content-Type", "")
|
||||||
|
cat_archive_fmt = detect_archive_format(final_url, cat_ct)
|
||||||
|
|
||||||
|
raw_response = response.read()
|
||||||
|
|
||||||
|
if cat_archive_fmt in ("tar.gz", "zip"):
|
||||||
|
# Download URL points to an archive — extract workflow.yml from it.
|
||||||
|
suffix = ".tar.gz" if cat_archive_fmt == "tar.gz" else ".zip"
|
||||||
|
arc_tmp = None
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_f:
|
||||||
|
arc_tmp = Path(arc_f.name)
|
||||||
|
arc_f.write(raw_response)
|
||||||
|
try:
|
||||||
|
wf_yaml_bytes = _extract_workflow_yml(arc_tmp, cat_archive_fmt)
|
||||||
|
finally:
|
||||||
|
if arc_tmp is not None:
|
||||||
|
arc_tmp.unlink(missing_ok=True)
|
||||||
|
workflow_file.write_bytes(wf_yaml_bytes)
|
||||||
|
else:
|
||||||
|
workflow_file.write_bytes(raw_response)
|
||||||
|
except typer.Exit:
|
||||||
|
raise
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
if workflow_dir.exists():
|
if workflow_dir.exists():
|
||||||
import shutil
|
import shutil
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ without bloating the core framework.
|
|||||||
import json
|
import json
|
||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
import tarfile
|
||||||
import tempfile
|
import tempfile
|
||||||
import zipfile
|
import zipfile
|
||||||
import shutil
|
import shutil
|
||||||
@@ -106,6 +108,137 @@ def normalize_priority(value: Any, default: int = 10) -> int:
|
|||||||
return priority if priority >= 1 else default
|
return priority if priority >= 1 else default
|
||||||
|
|
||||||
|
|
||||||
|
def detect_archive_format(url: str, content_type: str = "") -> str:
|
||||||
|
"""Detect archive format from URL path extension or Content-Type header.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url: URL or file path to inspect.
|
||||||
|
content_type: Optional ``Content-Type`` header value from the HTTP response.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
``"zip"`` for ZIP archives, ``"tar.gz"`` for gzipped tarballs, or ``""``
|
||||||
|
when the format cannot be determined.
|
||||||
|
"""
|
||||||
|
# Strip query-string / fragment before examining the path extension.
|
||||||
|
url_path = url.split("?")[0].split("#")[0].lower()
|
||||||
|
if url_path.endswith(".zip"):
|
||||||
|
return "zip"
|
||||||
|
if url_path.endswith(".tar.gz") or url_path.endswith(".tgz"):
|
||||||
|
return "tar.gz"
|
||||||
|
|
||||||
|
# Fall back to Content-Type header inspection.
|
||||||
|
ct = content_type.lower()
|
||||||
|
if "application/zip" in ct or "application/x-zip" in ct:
|
||||||
|
return "zip"
|
||||||
|
if any(
|
||||||
|
t in ct
|
||||||
|
for t in (
|
||||||
|
"application/gzip",
|
||||||
|
"application/x-gzip",
|
||||||
|
"application/x-tar+gzip",
|
||||||
|
)
|
||||||
|
):
|
||||||
|
return "tar.gz"
|
||||||
|
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def safe_extract_tarball(
|
||||||
|
archive_path: Path,
|
||||||
|
dest_dir: Path,
|
||||||
|
error_class: "type[Exception]" = Exception,
|
||||||
|
) -> None:
|
||||||
|
"""Safely extract a ``.tar.gz`` or ``.tgz`` archive into *dest_dir*.
|
||||||
|
|
||||||
|
All members are validated before extraction to prevent *tar slip*
|
||||||
|
(path traversal) attacks. Symlinks, hard links, and special files
|
||||||
|
(devices, FIFOs, etc.) are rejected.
|
||||||
|
|
||||||
|
On Python 3.12 and later the ``"data"`` extraction filter is applied
|
||||||
|
for an additional layer of OS-level protection. On earlier versions
|
||||||
|
the explicit member list (containing only pre-validated regular files
|
||||||
|
and directories) is passed to ``extractall()`` — since all symlinks are
|
||||||
|
already rejected in the validation phase, no archive-introduced symlink
|
||||||
|
can be followed during extraction.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
archive_path: Path to the ``.tar.gz``/``.tgz`` archive.
|
||||||
|
dest_dir: Destination directory (must already exist).
|
||||||
|
error_class: Exception class to raise on unsafe entries.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
error_class: If any member is unsafe or the archive cannot be read.
|
||||||
|
"""
|
||||||
|
dest_resolved = dest_dir.resolve()
|
||||||
|
# Tar metadata member types to skip during validation — they carry no
|
||||||
|
# extractable payload and are generated automatically by many common
|
||||||
|
# archiving tools (e.g. PAX headers, GNU longname/longlink entries).
|
||||||
|
# GNUTYPE_SPARSE is intentionally excluded: it carries a real file payload
|
||||||
|
# and tarfile.TarInfo.isreg() returns True for it, so it passes the
|
||||||
|
# regular-file check below and is extracted correctly.
|
||||||
|
_TAR_METADATA_TYPES = (
|
||||||
|
tarfile.XHDTYPE, # PAX extended header
|
||||||
|
tarfile.XGLTYPE, # PAX global extended header
|
||||||
|
tarfile.SOLARIS_XHDTYPE, # Solaris PAX extended header
|
||||||
|
tarfile.GNUTYPE_LONGNAME, # GNU long path name (metadata only)
|
||||||
|
tarfile.GNUTYPE_LONGLINK, # GNU long link name (metadata only)
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with tarfile.open(archive_path, "r:gz") as tf:
|
||||||
|
members = tf.getmembers()
|
||||||
|
safe_members = []
|
||||||
|
|
||||||
|
# Validate every member before extracting anything.
|
||||||
|
for member in members:
|
||||||
|
# Reject absolute paths and any path component that is "..".
|
||||||
|
if os.path.isabs(member.name) or any(
|
||||||
|
part == ".." for part in member.name.replace("\\", "/").split("/")
|
||||||
|
):
|
||||||
|
raise error_class(
|
||||||
|
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Confirm the resolved path stays inside dest_dir.
|
||||||
|
member_path = (dest_dir / member.name).resolve()
|
||||||
|
try:
|
||||||
|
member_path.relative_to(dest_resolved)
|
||||||
|
except ValueError:
|
||||||
|
raise error_class(
|
||||||
|
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Skip tar metadata members — they carry no extractable payload.
|
||||||
|
if member.type in _TAR_METADATA_TYPES:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Reject symlinks and hard links.
|
||||||
|
if member.issym() or member.islnk():
|
||||||
|
raise error_class(
|
||||||
|
f"Symlinks are not allowed in archive: {member.name}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Reject devices, FIFOs and other special file types.
|
||||||
|
if not (member.isreg() or member.isdir()):
|
||||||
|
raise error_class(
|
||||||
|
f"Non-regular file in archive: {member.name}"
|
||||||
|
)
|
||||||
|
|
||||||
|
safe_members.append(member)
|
||||||
|
|
||||||
|
# Extract — use the "data" filter on Python 3.12+ for extra hardening.
|
||||||
|
# On all versions pass only the pre-validated members so that no
|
||||||
|
# unvetted entry (added concurrently or via a race) slips through.
|
||||||
|
if sys.version_info >= (3, 12):
|
||||||
|
tf.extractall(dest_dir, members=safe_members, filter="data") # type: ignore[call-arg]
|
||||||
|
else:
|
||||||
|
tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above
|
||||||
|
except error_class:
|
||||||
|
raise
|
||||||
|
except (tarfile.TarError, OSError) as e:
|
||||||
|
raise error_class(f"Failed to read archive {archive_path}: {e}") from e
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class CatalogEntry:
|
class CatalogEntry:
|
||||||
"""Represents a single catalog entry in the catalog stack."""
|
"""Represents a single catalog entry in the catalog stack."""
|
||||||
@@ -1202,10 +1335,10 @@ class ExtensionManager:
|
|||||||
speckit_version: str,
|
speckit_version: str,
|
||||||
priority: int = 10,
|
priority: int = 10,
|
||||||
) -> ExtensionManifest:
|
) -> ExtensionManifest:
|
||||||
"""Install extension from ZIP file.
|
"""Install extension from a ZIP or ``.tar.gz``/``.tgz`` archive.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
zip_path: Path to extension ZIP file
|
zip_path: Path to the extension archive (ZIP or gzipped tarball).
|
||||||
speckit_version: Current spec-kit version
|
speckit_version: Current spec-kit version
|
||||||
priority: Resolution priority (lower = higher precedence, default 10)
|
priority: Resolution priority (lower = higher precedence, default 10)
|
||||||
|
|
||||||
@@ -1213,7 +1346,8 @@ class ExtensionManager:
|
|||||||
Installed extension manifest
|
Installed extension manifest
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
ValidationError: If manifest is invalid or priority is invalid
|
ValidationError: If manifest is invalid, the archive is unsafe, or
|
||||||
|
priority is invalid
|
||||||
CompatibilityError: If extension is incompatible
|
CompatibilityError: If extension is incompatible
|
||||||
"""
|
"""
|
||||||
# Validate priority early
|
# Validate priority early
|
||||||
@@ -1223,21 +1357,27 @@ class ExtensionManager:
|
|||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
temp_path = Path(tmpdir)
|
temp_path = Path(tmpdir)
|
||||||
|
|
||||||
# Extract ZIP safely (prevent Zip Slip attack)
|
archive_fmt = detect_archive_format(str(zip_path))
|
||||||
with zipfile.ZipFile(zip_path, 'r') as zf:
|
|
||||||
# Validate all paths first before extracting anything
|
if archive_fmt == "tar.gz":
|
||||||
temp_path_resolved = temp_path.resolve()
|
# Extract tarball safely (prevent tar slip attack)
|
||||||
for member in zf.namelist():
|
safe_extract_tarball(zip_path, temp_path, ValidationError)
|
||||||
member_path = (temp_path / member).resolve()
|
else:
|
||||||
# Use is_relative_to for safe path containment check
|
# Extract ZIP safely (prevent Zip Slip attack)
|
||||||
try:
|
with zipfile.ZipFile(zip_path, 'r') as zf:
|
||||||
member_path.relative_to(temp_path_resolved)
|
# Validate all paths first before extracting anything
|
||||||
except ValueError:
|
temp_path_resolved = temp_path.resolve()
|
||||||
raise ValidationError(
|
for member in zf.namelist():
|
||||||
f"Unsafe path in ZIP archive: {member} (potential path traversal)"
|
member_path = (temp_path / member).resolve()
|
||||||
)
|
# Use is_relative_to for safe path containment check
|
||||||
# Only extract after all paths are validated
|
try:
|
||||||
zf.extractall(temp_path)
|
member_path.relative_to(temp_path_resolved)
|
||||||
|
except ValueError:
|
||||||
|
raise ValidationError(
|
||||||
|
f"Unsafe path in ZIP archive: {member} (potential path traversal)"
|
||||||
|
)
|
||||||
|
# Only extract after all paths are validated
|
||||||
|
zf.extractall(temp_path)
|
||||||
|
|
||||||
# Find extension directory (may be nested)
|
# Find extension directory (may be nested)
|
||||||
extension_dir = temp_path
|
extension_dir = temp_path
|
||||||
@@ -1251,7 +1391,7 @@ class ExtensionManager:
|
|||||||
manifest_path = extension_dir / "extension.yml"
|
manifest_path = extension_dir / "extension.yml"
|
||||||
|
|
||||||
if not manifest_path.exists():
|
if not manifest_path.exists():
|
||||||
raise ValidationError("No extension.yml found in ZIP file")
|
raise ValidationError("No extension.yml found in archive")
|
||||||
|
|
||||||
# Install from extracted directory
|
# Install from extracted directory
|
||||||
return self.install_from_directory(extension_dir, speckit_version, priority=priority)
|
return self.install_from_directory(extension_dir, speckit_version, priority=priority)
|
||||||
@@ -1965,14 +2105,18 @@ class ExtensionCatalog:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def download_extension(self, extension_id: str, target_dir: Optional[Path] = None) -> Path:
|
def download_extension(self, extension_id: str, target_dir: Optional[Path] = None) -> Path:
|
||||||
"""Download extension ZIP from catalog.
|
"""Download extension archive from catalog.
|
||||||
|
|
||||||
|
Supports both ZIP (``.zip``) and gzipped tarball (``.tar.gz``/``.tgz``)
|
||||||
|
archives. The format is detected from the download URL's path extension;
|
||||||
|
when ambiguous the ``Content-Type`` header is used as a fallback.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
extension_id: ID of the extension to download
|
extension_id: ID of the extension to download
|
||||||
target_dir: Directory to save ZIP file (defaults to temp directory)
|
target_dir: Directory to save the archive (defaults to cache directory)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Path to downloaded ZIP file
|
Path to downloaded archive file
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
ExtensionError: If extension not found or download fails
|
ExtensionError: If extension not found or download fails
|
||||||
@@ -2011,21 +2155,60 @@ class ExtensionCatalog:
|
|||||||
target_dir.mkdir(parents=True, exist_ok=True)
|
target_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
version = ext_info.get("version", "unknown")
|
version = ext_info.get("version", "unknown")
|
||||||
zip_filename = f"{extension_id}-{version}.zip"
|
|
||||||
zip_path = target_dir / zip_filename
|
|
||||||
|
|
||||||
# Download the ZIP file
|
# Download the archive. Determine the archive format from the
|
||||||
|
# post-redirect URL first (with Content-Type fallback); only use the
|
||||||
|
# original `download_url` as a last hint if the final URL gives no
|
||||||
|
# signal.
|
||||||
|
final_url = download_url
|
||||||
|
archive_fmt = ""
|
||||||
try:
|
try:
|
||||||
with self._open_url(download_url, timeout=60) as response:
|
with self._open_url(download_url, timeout=60) as response:
|
||||||
zip_data = response.read()
|
final_url = response.geturl()
|
||||||
|
# Re-validate scheme after any redirect to guard against
|
||||||
zip_path.write_bytes(zip_data)
|
# scheme-downgrade. Validate BEFORE reading the body so a
|
||||||
return zip_path
|
# malicious redirect cannot cause us to fetch the payload
|
||||||
|
# over an insecure scheme.
|
||||||
|
_final_parsed = urlparse(final_url)
|
||||||
|
_final_is_localhost = _final_parsed.hostname in (
|
||||||
|
"localhost",
|
||||||
|
"127.0.0.1",
|
||||||
|
"::1",
|
||||||
|
)
|
||||||
|
if _final_parsed.scheme != "https" and not (
|
||||||
|
_final_parsed.scheme == "http" and _final_is_localhost
|
||||||
|
):
|
||||||
|
raise ExtensionError(
|
||||||
|
f"Extension download URL was redirected to a non-HTTPS URL: {final_url}"
|
||||||
|
)
|
||||||
|
content_type = response.headers.get("Content-Type", "")
|
||||||
|
archive_fmt = detect_archive_format(final_url, content_type)
|
||||||
|
if not archive_fmt:
|
||||||
|
archive_fmt = detect_archive_format(download_url)
|
||||||
|
archive_data = response.read()
|
||||||
|
|
||||||
except urllib.error.URLError as e:
|
except urllib.error.URLError as e:
|
||||||
raise ExtensionError(f"Failed to download extension from {download_url}: {e}")
|
raise ExtensionError(f"Failed to download extension from {download_url}: {e}")
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
raise ExtensionError(f"Failed to save extension ZIP: {e}")
|
raise ExtensionError(f"Failed to read extension archive from {download_url}: {e}")
|
||||||
|
|
||||||
|
# Choose file extension based on detected format.
|
||||||
|
if not archive_fmt:
|
||||||
|
raise ExtensionError(
|
||||||
|
f"Could not determine archive format for {download_url}. "
|
||||||
|
"Ensure the URL points to a .zip or .tar.gz/.tgz file."
|
||||||
|
)
|
||||||
|
if archive_fmt == "tar.gz":
|
||||||
|
archive_filename = f"{extension_id}-{version}.tar.gz"
|
||||||
|
else:
|
||||||
|
archive_filename = f"{extension_id}-{version}.zip"
|
||||||
|
|
||||||
|
archive_path = target_dir / archive_filename
|
||||||
|
try:
|
||||||
|
archive_path.write_bytes(archive_data)
|
||||||
|
except IOError as e:
|
||||||
|
raise ExtensionError(f"Failed to save extension archive: {e}")
|
||||||
|
return archive_path
|
||||||
|
|
||||||
def clear_cache(self):
|
def clear_cache(self):
|
||||||
"""Clear the catalog cache (both legacy and URL-hash-based files)."""
|
"""Clear the catalog cache (both legacy and URL-hash-based files)."""
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ import yaml
|
|||||||
from packaging import version as pkg_version
|
from packaging import version as pkg_version
|
||||||
from packaging.specifiers import SpecifierSet, InvalidSpecifier
|
from packaging.specifiers import SpecifierSet, InvalidSpecifier
|
||||||
|
|
||||||
from .extensions import ExtensionRegistry, normalize_priority
|
from .extensions import ExtensionRegistry, normalize_priority, detect_archive_format, safe_extract_tarball
|
||||||
|
|
||||||
|
|
||||||
def _substitute_core_template(
|
def _substitute_core_template(
|
||||||
@@ -1604,10 +1604,10 @@ class PresetManager:
|
|||||||
speckit_version: str,
|
speckit_version: str,
|
||||||
priority: int = 10,
|
priority: int = 10,
|
||||||
) -> PresetManifest:
|
) -> PresetManifest:
|
||||||
"""Install preset from ZIP file.
|
"""Install preset from a ZIP or ``.tar.gz``/``.tgz`` archive.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
zip_path: Path to preset ZIP file
|
zip_path: Path to the preset archive (ZIP or gzipped tarball).
|
||||||
speckit_version: Current spec-kit version
|
speckit_version: Current spec-kit version
|
||||||
priority: Resolution priority (lower = higher precedence, default 10)
|
priority: Resolution priority (lower = higher precedence, default 10)
|
||||||
|
|
||||||
@@ -1615,7 +1615,8 @@ class PresetManager:
|
|||||||
Installed preset manifest
|
Installed preset manifest
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
PresetValidationError: If manifest is invalid or priority is invalid
|
PresetValidationError: If manifest is invalid, the archive is unsafe,
|
||||||
|
or priority is invalid
|
||||||
PresetCompatibilityError: If pack is incompatible
|
PresetCompatibilityError: If pack is incompatible
|
||||||
"""
|
"""
|
||||||
# Validate priority early
|
# Validate priority early
|
||||||
@@ -1625,18 +1626,24 @@ class PresetManager:
|
|||||||
with tempfile.TemporaryDirectory() as tmpdir:
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
temp_path = Path(tmpdir)
|
temp_path = Path(tmpdir)
|
||||||
|
|
||||||
with zipfile.ZipFile(zip_path, 'r') as zf:
|
archive_fmt = detect_archive_format(str(zip_path))
|
||||||
temp_path_resolved = temp_path.resolve()
|
|
||||||
for member in zf.namelist():
|
if archive_fmt == "tar.gz":
|
||||||
member_path = (temp_path / member).resolve()
|
# Extract tarball safely (prevent tar slip attack)
|
||||||
try:
|
safe_extract_tarball(zip_path, temp_path, PresetValidationError)
|
||||||
member_path.relative_to(temp_path_resolved)
|
else:
|
||||||
except ValueError:
|
with zipfile.ZipFile(zip_path, 'r') as zf:
|
||||||
raise PresetValidationError(
|
temp_path_resolved = temp_path.resolve()
|
||||||
f"Unsafe path in ZIP archive: {member} "
|
for member in zf.namelist():
|
||||||
"(potential path traversal)"
|
member_path = (temp_path / member).resolve()
|
||||||
)
|
try:
|
||||||
zf.extractall(temp_path)
|
member_path.relative_to(temp_path_resolved)
|
||||||
|
except ValueError:
|
||||||
|
raise PresetValidationError(
|
||||||
|
f"Unsafe path in ZIP archive: {member} "
|
||||||
|
"(potential path traversal)"
|
||||||
|
)
|
||||||
|
zf.extractall(temp_path)
|
||||||
|
|
||||||
pack_dir = temp_path
|
pack_dir = temp_path
|
||||||
manifest_path = pack_dir / "preset.yml"
|
manifest_path = pack_dir / "preset.yml"
|
||||||
@@ -1649,7 +1656,7 @@ class PresetManager:
|
|||||||
|
|
||||||
if not manifest_path.exists():
|
if not manifest_path.exists():
|
||||||
raise PresetValidationError(
|
raise PresetValidationError(
|
||||||
"No preset.yml found in ZIP file"
|
"No preset.yml found in archive"
|
||||||
)
|
)
|
||||||
|
|
||||||
return self.install_from_directory(pack_dir, speckit_version, priority)
|
return self.install_from_directory(pack_dir, speckit_version, priority)
|
||||||
@@ -2242,14 +2249,18 @@ class PresetCatalog:
|
|||||||
def download_pack(
|
def download_pack(
|
||||||
self, pack_id: str, target_dir: Optional[Path] = None
|
self, pack_id: str, target_dir: Optional[Path] = None
|
||||||
) -> Path:
|
) -> Path:
|
||||||
"""Download preset ZIP from catalog.
|
"""Download preset archive from catalog.
|
||||||
|
|
||||||
|
Supports both ZIP (``.zip``) and gzipped tarball (``.tar.gz``/``.tgz``)
|
||||||
|
archives. The format is detected from the download URL's path extension;
|
||||||
|
when ambiguous the ``Content-Type`` header is used as a fallback.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
pack_id: ID of the preset to download
|
pack_id: ID of the preset to download
|
||||||
target_dir: Directory to save ZIP file (defaults to cache directory)
|
target_dir: Directory to save the archive (defaults to cache directory)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Path to downloaded ZIP file
|
Path to downloaded archive file
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
PresetError: If pack not found or download fails
|
PresetError: If pack not found or download fails
|
||||||
@@ -2301,22 +2312,61 @@ class PresetCatalog:
|
|||||||
target_dir.mkdir(parents=True, exist_ok=True)
|
target_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
version = pack_info.get("version", "unknown")
|
version = pack_info.get("version", "unknown")
|
||||||
zip_filename = f"{pack_id}-{version}.zip"
|
|
||||||
zip_path = target_dir / zip_filename
|
|
||||||
|
|
||||||
|
# Determine the archive format from the post-redirect URL first
|
||||||
|
# (with Content-Type fallback); only use the original `download_url`
|
||||||
|
# as a last hint if the final URL gives no signal.
|
||||||
|
final_url = download_url
|
||||||
|
archive_fmt = ""
|
||||||
try:
|
try:
|
||||||
with self._open_url(download_url, timeout=60) as response:
|
with self._open_url(download_url, timeout=60) as response:
|
||||||
zip_data = response.read()
|
final_url = response.geturl()
|
||||||
|
# Re-validate scheme after any redirect to guard against
|
||||||
zip_path.write_bytes(zip_data)
|
# scheme-downgrade. Validate BEFORE reading the body so a
|
||||||
return zip_path
|
# malicious redirect cannot cause us to fetch the payload
|
||||||
|
# over an insecure scheme.
|
||||||
|
_final_parsed = urlparse(final_url)
|
||||||
|
_final_is_localhost = _final_parsed.hostname in (
|
||||||
|
"localhost",
|
||||||
|
"127.0.0.1",
|
||||||
|
"::1",
|
||||||
|
)
|
||||||
|
if _final_parsed.scheme != "https" and not (
|
||||||
|
_final_parsed.scheme == "http" and _final_is_localhost
|
||||||
|
):
|
||||||
|
raise PresetError(
|
||||||
|
f"Preset download URL was redirected to a non-HTTPS URL: {final_url}"
|
||||||
|
)
|
||||||
|
content_type = response.headers.get("Content-Type", "")
|
||||||
|
archive_fmt = detect_archive_format(final_url, content_type)
|
||||||
|
if not archive_fmt:
|
||||||
|
archive_fmt = detect_archive_format(download_url)
|
||||||
|
archive_data = response.read()
|
||||||
|
|
||||||
except urllib.error.URLError as e:
|
except urllib.error.URLError as e:
|
||||||
raise PresetError(
|
raise PresetError(
|
||||||
f"Failed to download preset from {download_url}: {e}"
|
f"Failed to download preset from {download_url}: {e}"
|
||||||
)
|
)
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
raise PresetError(f"Failed to save preset ZIP: {e}")
|
raise PresetError(f"Failed to read preset archive from {download_url}: {e}")
|
||||||
|
|
||||||
|
# Choose file extension based on detected format.
|
||||||
|
if not archive_fmt:
|
||||||
|
raise PresetError(
|
||||||
|
f"Could not determine archive format for {download_url}. "
|
||||||
|
"Ensure the URL points to a .zip or .tar.gz/.tgz file."
|
||||||
|
)
|
||||||
|
if archive_fmt == "tar.gz":
|
||||||
|
archive_filename = f"{pack_id}-{version}.tar.gz"
|
||||||
|
else:
|
||||||
|
archive_filename = f"{pack_id}-{version}.zip"
|
||||||
|
|
||||||
|
archive_path = target_dir / archive_filename
|
||||||
|
try:
|
||||||
|
archive_path.write_bytes(archive_data)
|
||||||
|
except IOError as e:
|
||||||
|
raise PresetError(f"Failed to save preset archive: {e}")
|
||||||
|
return archive_path
|
||||||
|
|
||||||
def clear_cache(self):
|
def clear_cache(self):
|
||||||
"""Clear all catalog cache files, including per-URL hashed caches."""
|
"""Clear all catalog cache files, including per-URL hashed caches."""
|
||||||
|
|||||||
@@ -178,6 +178,47 @@ class TestNormalizePriority:
|
|||||||
assert normalize_priority("invalid", default=1) == 1
|
assert normalize_priority("invalid", default=1) == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ===== detect_archive_format Tests =====
|
||||||
|
|
||||||
|
class TestDetectArchiveFormat:
|
||||||
|
"""Test the detect_archive_format helper."""
|
||||||
|
|
||||||
|
def _fmt(self, url, ct=""):
|
||||||
|
from specify_cli.extensions import detect_archive_format
|
||||||
|
return detect_archive_format(url, ct)
|
||||||
|
|
||||||
|
def test_zip_url_extension(self):
|
||||||
|
assert self._fmt("https://example.com/ext-1.0.0.zip") == "zip"
|
||||||
|
|
||||||
|
def test_tar_gz_url_extension(self):
|
||||||
|
assert self._fmt("https://example.com/ext-1.0.0.tar.gz") == "tar.gz"
|
||||||
|
|
||||||
|
def test_tgz_url_extension(self):
|
||||||
|
assert self._fmt("https://example.com/ext-1.0.0.tgz") == "tar.gz"
|
||||||
|
|
||||||
|
def test_zip_uppercase_url_extension(self):
|
||||||
|
assert self._fmt("https://example.com/ext.ZIP") == "zip"
|
||||||
|
|
||||||
|
def test_tar_gz_with_query_string(self):
|
||||||
|
assert self._fmt("https://example.com/ext.tar.gz?token=abc") == "tar.gz"
|
||||||
|
|
||||||
|
def test_zip_content_type_fallback(self):
|
||||||
|
assert self._fmt("https://example.com/download", "application/zip") == "zip"
|
||||||
|
|
||||||
|
def test_gzip_content_type_fallback(self):
|
||||||
|
assert self._fmt("https://example.com/download", "application/gzip") == "tar.gz"
|
||||||
|
|
||||||
|
def test_x_gzip_content_type_fallback(self):
|
||||||
|
assert self._fmt("https://example.com/download", "application/x-gzip") == "tar.gz"
|
||||||
|
|
||||||
|
def test_unknown_returns_empty_string(self):
|
||||||
|
assert self._fmt("https://example.com/workflow.yml") == ""
|
||||||
|
|
||||||
|
def test_url_extension_takes_precedence_over_content_type(self):
|
||||||
|
# URL says .zip — content-type claiming gzip should not override.
|
||||||
|
assert self._fmt("https://example.com/ext.zip", "application/gzip") == "zip"
|
||||||
|
|
||||||
|
|
||||||
# ===== ExtensionManifest Tests =====
|
# ===== ExtensionManifest Tests =====
|
||||||
|
|
||||||
class TestExtensionManifest:
|
class TestExtensionManifest:
|
||||||
@@ -1013,6 +1054,97 @@ class TestExtensionManager:
|
|||||||
assert backup_file.read_text() == "test: config"
|
assert backup_file.read_text() == "test: config"
|
||||||
|
|
||||||
|
|
||||||
|
# ===== install_from_zip Tarball Tests =====
|
||||||
|
|
||||||
|
class TestInstallFromTarball:
|
||||||
|
"""Tests for install_from_zip accepting .tar.gz/.tgz archives."""
|
||||||
|
|
||||||
|
def _make_tarball(self, dest: Path, extension_dir: Path, nested: bool = False) -> None:
|
||||||
|
"""Create a minimal .tar.gz archive from *extension_dir*."""
|
||||||
|
import tarfile
|
||||||
|
with tarfile.open(dest, "w:gz") as tf:
|
||||||
|
for file_path in extension_dir.rglob("*"):
|
||||||
|
if file_path.is_file():
|
||||||
|
arcname = file_path.relative_to(extension_dir)
|
||||||
|
if nested:
|
||||||
|
arcname = Path("test-ext-v1.0.0") / arcname
|
||||||
|
tf.add(file_path, arcname=str(arcname))
|
||||||
|
|
||||||
|
def test_install_from_tar_gz(self, extension_dir, project_dir, temp_dir):
|
||||||
|
"""install_from_zip should accept a .tar.gz archive."""
|
||||||
|
archive = temp_dir / "test-ext-1.0.0.tar.gz"
|
||||||
|
self._make_tarball(archive, extension_dir)
|
||||||
|
|
||||||
|
manager = ExtensionManager(project_dir)
|
||||||
|
manifest = manager.install_from_zip(archive, "0.1.0")
|
||||||
|
assert manifest.id == "test-ext"
|
||||||
|
assert manager.registry.is_installed("test-ext")
|
||||||
|
|
||||||
|
def test_install_from_tgz(self, extension_dir, project_dir, temp_dir):
|
||||||
|
"""install_from_zip should accept a .tgz archive."""
|
||||||
|
archive = temp_dir / "test-ext-1.0.0.tgz"
|
||||||
|
self._make_tarball(archive, extension_dir)
|
||||||
|
|
||||||
|
manager = ExtensionManager(project_dir)
|
||||||
|
manifest = manager.install_from_zip(archive, "0.1.0")
|
||||||
|
assert manifest.id == "test-ext"
|
||||||
|
assert manager.registry.is_installed("test-ext")
|
||||||
|
|
||||||
|
def test_install_from_tar_gz_nested(self, extension_dir, project_dir, temp_dir):
|
||||||
|
"""install_from_zip should handle a single nested directory inside the tarball."""
|
||||||
|
archive = temp_dir / "test-ext-nested.tar.gz"
|
||||||
|
self._make_tarball(archive, extension_dir, nested=True)
|
||||||
|
|
||||||
|
manager = ExtensionManager(project_dir)
|
||||||
|
manifest = manager.install_from_zip(archive, "0.1.0")
|
||||||
|
assert manifest.id == "test-ext"
|
||||||
|
assert manager.registry.is_installed("test-ext")
|
||||||
|
|
||||||
|
def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir):
|
||||||
|
"""install_from_zip raises ValidationError when tarball has no extension.yml."""
|
||||||
|
import tarfile
|
||||||
|
import io
|
||||||
|
archive = temp_dir / "bad.tar.gz"
|
||||||
|
with tarfile.open(archive, "w:gz") as tf:
|
||||||
|
data = b"no manifest here"
|
||||||
|
info = tarfile.TarInfo(name="readme.txt")
|
||||||
|
info.size = len(data)
|
||||||
|
tf.addfile(info, io.BytesIO(data))
|
||||||
|
|
||||||
|
manager = ExtensionManager(project_dir)
|
||||||
|
with pytest.raises(ValidationError, match="No extension.yml found"):
|
||||||
|
manager.install_from_zip(archive, "0.1.0")
|
||||||
|
|
||||||
|
def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir):
|
||||||
|
"""install_from_zip must reject tarballs with path traversal entries."""
|
||||||
|
import tarfile
|
||||||
|
import io
|
||||||
|
archive = temp_dir / "evil.tar.gz"
|
||||||
|
with tarfile.open(archive, "w:gz") as tf:
|
||||||
|
info = tarfile.TarInfo(name="../../evil.txt")
|
||||||
|
data = b"evil"
|
||||||
|
info.size = len(data)
|
||||||
|
tf.addfile(info, io.BytesIO(data))
|
||||||
|
|
||||||
|
manager = ExtensionManager(project_dir)
|
||||||
|
with pytest.raises(ValidationError, match="Unsafe path"):
|
||||||
|
manager.install_from_zip(archive, "0.1.0")
|
||||||
|
|
||||||
|
def test_install_from_tar_gz_rejects_symlinks(self, project_dir, temp_dir):
|
||||||
|
"""install_from_zip must reject tarballs containing symlinks."""
|
||||||
|
import tarfile
|
||||||
|
archive = temp_dir / "symlink.tar.gz"
|
||||||
|
with tarfile.open(archive, "w:gz") as tf:
|
||||||
|
info = tarfile.TarInfo(name="link")
|
||||||
|
info.type = tarfile.SYMTYPE
|
||||||
|
info.linkname = "/etc/passwd"
|
||||||
|
tf.addfile(info)
|
||||||
|
|
||||||
|
manager = ExtensionManager(project_dir)
|
||||||
|
with pytest.raises(ValidationError, match="Symlinks"):
|
||||||
|
manager.install_from_zip(archive, "0.1.0")
|
||||||
|
|
||||||
|
|
||||||
# ===== CommandRegistrar Tests =====
|
# ===== CommandRegistrar Tests =====
|
||||||
|
|
||||||
class TestCommandRegistrar:
|
class TestCommandRegistrar:
|
||||||
@@ -2627,6 +2759,7 @@ class TestExtensionCatalog:
|
|||||||
|
|
||||||
mock_response = MagicMock()
|
mock_response = MagicMock()
|
||||||
mock_response.read.return_value = zip_bytes
|
mock_response.read.return_value = zip_bytes
|
||||||
|
mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-ext.zip"
|
||||||
mock_response.__enter__ = lambda s: s
|
mock_response.__enter__ = lambda s: s
|
||||||
mock_response.__exit__ = MagicMock(return_value=False)
|
mock_response.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
@@ -3529,6 +3662,7 @@ class TestDownloadExtensionBundled:
|
|||||||
|
|
||||||
mock_response = MagicMock()
|
mock_response = MagicMock()
|
||||||
mock_response.read.return_value = b"fake zip data"
|
mock_response.read.return_value = b"fake zip data"
|
||||||
|
mock_response.geturl.return_value = "https://example.com/git-2.0.0.zip"
|
||||||
mock_response.__enter__ = lambda s: s
|
mock_response.__enter__ = lambda s: s
|
||||||
mock_response.__exit__ = MagicMock(return_value=False)
|
mock_response.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
|
|||||||
@@ -649,6 +649,90 @@ class TestPresetManager:
|
|||||||
with pytest.raises(PresetValidationError, match="No preset.yml found"):
|
with pytest.raises(PresetValidationError, match="No preset.yml found"):
|
||||||
manager.install_from_zip(zip_path, "0.1.5")
|
manager.install_from_zip(zip_path, "0.1.5")
|
||||||
|
|
||||||
|
def _make_tarball(self, dest, pack_dir, nested=False):
|
||||||
|
import tarfile
|
||||||
|
with tarfile.open(dest, "w:gz") as tf:
|
||||||
|
for file_path in pack_dir.rglob("*"):
|
||||||
|
if file_path.is_file():
|
||||||
|
arcname = file_path.relative_to(pack_dir)
|
||||||
|
if nested:
|
||||||
|
arcname = Path("test-pack-v1.0.0") / arcname
|
||||||
|
tf.add(file_path, arcname=str(arcname))
|
||||||
|
|
||||||
|
def test_install_from_tar_gz(self, project_dir, pack_dir, temp_dir):
|
||||||
|
"""Test installing a preset from a .tar.gz archive."""
|
||||||
|
archive = temp_dir / "test-pack-1.0.tar.gz"
|
||||||
|
self._make_tarball(archive, pack_dir)
|
||||||
|
|
||||||
|
manager = PresetManager(project_dir)
|
||||||
|
manifest = manager.install_from_zip(archive, "0.1.5")
|
||||||
|
assert manifest.id == "test-pack"
|
||||||
|
assert manager.registry.is_installed("test-pack")
|
||||||
|
|
||||||
|
def test_install_from_tgz(self, project_dir, pack_dir, temp_dir):
|
||||||
|
"""Test installing a preset from a .tgz archive."""
|
||||||
|
archive = temp_dir / "test-pack-1.0.tgz"
|
||||||
|
self._make_tarball(archive, pack_dir)
|
||||||
|
|
||||||
|
manager = PresetManager(project_dir)
|
||||||
|
manifest = manager.install_from_zip(archive, "0.1.5")
|
||||||
|
assert manifest.id == "test-pack"
|
||||||
|
assert manager.registry.is_installed("test-pack")
|
||||||
|
|
||||||
|
def test_install_from_tar_gz_nested(self, project_dir, pack_dir, temp_dir):
|
||||||
|
"""Test installing a preset from a .tar.gz archive with a single nested directory."""
|
||||||
|
archive = temp_dir / "test-pack-nested.tar.gz"
|
||||||
|
self._make_tarball(archive, pack_dir, nested=True)
|
||||||
|
|
||||||
|
manager = PresetManager(project_dir)
|
||||||
|
manifest = manager.install_from_zip(archive, "0.1.5")
|
||||||
|
assert manifest.id == "test-pack"
|
||||||
|
assert manager.registry.is_installed("test-pack")
|
||||||
|
|
||||||
|
def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir):
|
||||||
|
"""Test installing a preset from a .tar.gz without preset.yml raises error."""
|
||||||
|
import tarfile
|
||||||
|
import io
|
||||||
|
archive = temp_dir / "bad.tar.gz"
|
||||||
|
with tarfile.open(archive, "w:gz") as tf:
|
||||||
|
data = b"no manifest here"
|
||||||
|
info = tarfile.TarInfo(name="readme.txt")
|
||||||
|
info.size = len(data)
|
||||||
|
tf.addfile(info, io.BytesIO(data))
|
||||||
|
|
||||||
|
manager = PresetManager(project_dir)
|
||||||
|
with pytest.raises(PresetValidationError, match="No preset.yml found"):
|
||||||
|
manager.install_from_zip(archive, "0.1.5")
|
||||||
|
|
||||||
|
def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir):
|
||||||
|
"""install_from_zip must reject tarballs with path traversal entries."""
|
||||||
|
import tarfile
|
||||||
|
import io
|
||||||
|
archive = temp_dir / "evil.tar.gz"
|
||||||
|
with tarfile.open(archive, "w:gz") as tf:
|
||||||
|
info = tarfile.TarInfo(name="../../evil.txt")
|
||||||
|
data = b"evil"
|
||||||
|
info.size = len(data)
|
||||||
|
tf.addfile(info, io.BytesIO(data))
|
||||||
|
|
||||||
|
manager = PresetManager(project_dir)
|
||||||
|
with pytest.raises(PresetValidationError, match="Unsafe path"):
|
||||||
|
manager.install_from_zip(archive, "0.1.5")
|
||||||
|
|
||||||
|
def test_install_from_tar_gz_rejects_symlinks(self, project_dir, temp_dir):
|
||||||
|
"""install_from_zip must reject tarballs containing symlinks."""
|
||||||
|
import tarfile
|
||||||
|
archive = temp_dir / "symlink.tar.gz"
|
||||||
|
with tarfile.open(archive, "w:gz") as tf:
|
||||||
|
info = tarfile.TarInfo(name="link")
|
||||||
|
info.type = tarfile.SYMTYPE
|
||||||
|
info.linkname = "/etc/passwd"
|
||||||
|
tf.addfile(info)
|
||||||
|
|
||||||
|
manager = PresetManager(project_dir)
|
||||||
|
with pytest.raises(PresetValidationError, match="Symlinks"):
|
||||||
|
manager.install_from_zip(archive, "0.1.5")
|
||||||
|
|
||||||
def test_remove(self, project_dir, pack_dir):
|
def test_remove(self, project_dir, pack_dir):
|
||||||
"""Test removing a preset."""
|
"""Test removing a preset."""
|
||||||
manager = PresetManager(project_dir)
|
manager = PresetManager(project_dir)
|
||||||
@@ -1529,6 +1613,7 @@ class TestPresetCatalog:
|
|||||||
|
|
||||||
mock_response = MagicMock()
|
mock_response = MagicMock()
|
||||||
mock_response.read.return_value = zip_bytes
|
mock_response.read.return_value = zip_bytes
|
||||||
|
mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-pack.zip"
|
||||||
mock_response.__enter__ = lambda s: s
|
mock_response.__enter__ = lambda s: s
|
||||||
mock_response.__exit__ = MagicMock(return_value=False)
|
mock_response.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
|
|||||||
@@ -1843,3 +1843,230 @@ steps:
|
|||||||
assert state.status == RunStatus.COMPLETED
|
assert state.status == RunStatus.COMPLETED
|
||||||
assert "do-plan" in state.step_results
|
assert "do-plan" in state.step_results
|
||||||
assert "do-specify" not in state.step_results
|
assert "do-specify" not in state.step_results
|
||||||
|
|
||||||
|
|
||||||
|
# ===== workflow add archive CLI tests =====
|
||||||
|
|
||||||
|
MINIMAL_WORKFLOW_YAML = """\
|
||||||
|
schema_version: "1.0"
|
||||||
|
workflow:
|
||||||
|
id: "arc-workflow"
|
||||||
|
name: "Archive Workflow"
|
||||||
|
version: "1.0.0"
|
||||||
|
description: "Installed from archive"
|
||||||
|
steps:
|
||||||
|
- id: step-one
|
||||||
|
type: shell
|
||||||
|
run: "echo hello"
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class TestWorkflowAddArchive:
|
||||||
|
"""CLI-level tests for `workflow add` with local archive files."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def project_dir(self, tmp_path):
|
||||||
|
"""Create a minimal spec-kit project."""
|
||||||
|
specify = tmp_path / ".specify"
|
||||||
|
specify.mkdir()
|
||||||
|
(specify / "workflows").mkdir()
|
||||||
|
return tmp_path
|
||||||
|
|
||||||
|
def _runner_and_app(self):
|
||||||
|
from typer.testing import CliRunner
|
||||||
|
from specify_cli import app
|
||||||
|
return CliRunner(), app
|
||||||
|
|
||||||
|
# -- Local ZIP archive --------------------------------------------------
|
||||||
|
|
||||||
|
def test_workflow_add_local_zip_flat(self, project_dir):
|
||||||
|
"""workflow add installs from a local ZIP with workflow.yml at root."""
|
||||||
|
import zipfile
|
||||||
|
from unittest.mock import patch
|
||||||
|
runner, app = self._runner_and_app()
|
||||||
|
|
||||||
|
archive = project_dir / "workflow.zip"
|
||||||
|
with zipfile.ZipFile(archive, "w") as zf:
|
||||||
|
zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML)
|
||||||
|
|
||||||
|
with patch.object(Path, "cwd", return_value=project_dir):
|
||||||
|
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
|
||||||
|
|
||||||
|
assert result.exit_code == 0, result.output
|
||||||
|
assert "arc-workflow" in result.output
|
||||||
|
installed = project_dir / ".specify" / "workflows" / "arc-workflow" / "workflow.yml"
|
||||||
|
assert installed.exists()
|
||||||
|
|
||||||
|
def test_workflow_add_local_zip_nested(self, project_dir):
|
||||||
|
"""workflow add installs from a local ZIP with workflow.yml in a subdirectory."""
|
||||||
|
import zipfile
|
||||||
|
from unittest.mock import patch
|
||||||
|
runner, app = self._runner_and_app()
|
||||||
|
|
||||||
|
archive = project_dir / "workflow.zip"
|
||||||
|
with zipfile.ZipFile(archive, "w") as zf:
|
||||||
|
zf.writestr("repo-1.0/workflow.yml", MINIMAL_WORKFLOW_YAML)
|
||||||
|
|
||||||
|
with patch.object(Path, "cwd", return_value=project_dir):
|
||||||
|
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
|
||||||
|
|
||||||
|
assert result.exit_code == 0, result.output
|
||||||
|
assert "arc-workflow" in result.output
|
||||||
|
|
||||||
|
def test_workflow_add_local_zip_missing_workflow_yml(self, project_dir):
|
||||||
|
"""workflow add exits with an error when the ZIP has no workflow.yml."""
|
||||||
|
import zipfile
|
||||||
|
from unittest.mock import patch
|
||||||
|
runner, app = self._runner_and_app()
|
||||||
|
|
||||||
|
archive = project_dir / "empty.zip"
|
||||||
|
with zipfile.ZipFile(archive, "w") as zf:
|
||||||
|
zf.writestr("README.md", "nothing here")
|
||||||
|
|
||||||
|
with patch.object(Path, "cwd", return_value=project_dir):
|
||||||
|
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True)
|
||||||
|
|
||||||
|
assert result.exit_code != 0
|
||||||
|
assert "extract" in result.output.lower() or "workflow" in result.output.lower()
|
||||||
|
|
||||||
|
# -- Local tar.gz archive -----------------------------------------------
|
||||||
|
|
||||||
|
def test_workflow_add_local_tar_gz_flat(self, project_dir):
|
||||||
|
"""workflow add installs from a local .tar.gz with workflow.yml at root."""
|
||||||
|
import tarfile, io
|
||||||
|
from unittest.mock import patch
|
||||||
|
runner, app = self._runner_and_app()
|
||||||
|
|
||||||
|
archive = project_dir / "workflow.tar.gz"
|
||||||
|
with tarfile.open(archive, "w:gz") as tf:
|
||||||
|
data = MINIMAL_WORKFLOW_YAML.encode()
|
||||||
|
info = tarfile.TarInfo(name="workflow.yml")
|
||||||
|
info.size = len(data)
|
||||||
|
tf.addfile(info, io.BytesIO(data))
|
||||||
|
|
||||||
|
with patch.object(Path, "cwd", return_value=project_dir):
|
||||||
|
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
|
||||||
|
|
||||||
|
assert result.exit_code == 0, result.output
|
||||||
|
assert "arc-workflow" in result.output
|
||||||
|
installed = project_dir / ".specify" / "workflows" / "arc-workflow" / "workflow.yml"
|
||||||
|
assert installed.exists()
|
||||||
|
|
||||||
|
def test_workflow_add_local_tar_gz_nested(self, project_dir):
|
||||||
|
"""workflow add installs from a local .tar.gz with workflow.yml in a subdirectory."""
|
||||||
|
import tarfile, io
|
||||||
|
from unittest.mock import patch
|
||||||
|
runner, app = self._runner_and_app()
|
||||||
|
|
||||||
|
archive = project_dir / "workflow.tar.gz"
|
||||||
|
with tarfile.open(archive, "w:gz") as tf:
|
||||||
|
data = MINIMAL_WORKFLOW_YAML.encode()
|
||||||
|
info = tarfile.TarInfo(name="repo-1.0/workflow.yml")
|
||||||
|
info.size = len(data)
|
||||||
|
tf.addfile(info, io.BytesIO(data))
|
||||||
|
|
||||||
|
with patch.object(Path, "cwd", return_value=project_dir):
|
||||||
|
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
|
||||||
|
|
||||||
|
assert result.exit_code == 0, result.output
|
||||||
|
assert "arc-workflow" in result.output
|
||||||
|
|
||||||
|
def test_workflow_add_local_tgz_flat(self, project_dir):
|
||||||
|
"""workflow add recognises the .tgz extension as a gzipped tarball."""
|
||||||
|
import tarfile, io
|
||||||
|
from unittest.mock import patch
|
||||||
|
runner, app = self._runner_and_app()
|
||||||
|
|
||||||
|
archive = project_dir / "workflow.tgz"
|
||||||
|
with tarfile.open(archive, "w:gz") as tf:
|
||||||
|
data = MINIMAL_WORKFLOW_YAML.encode()
|
||||||
|
info = tarfile.TarInfo(name="workflow.yml")
|
||||||
|
info.size = len(data)
|
||||||
|
tf.addfile(info, io.BytesIO(data))
|
||||||
|
|
||||||
|
with patch.object(Path, "cwd", return_value=project_dir):
|
||||||
|
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
|
||||||
|
|
||||||
|
assert result.exit_code == 0, result.output
|
||||||
|
assert "arc-workflow" in result.output
|
||||||
|
|
||||||
|
def test_workflow_add_local_tar_gz_missing_workflow_yml(self, project_dir):
|
||||||
|
"""workflow add exits with an error when the .tar.gz has no workflow.yml."""
|
||||||
|
import tarfile, io
|
||||||
|
from unittest.mock import patch
|
||||||
|
runner, app = self._runner_and_app()
|
||||||
|
|
||||||
|
archive = project_dir / "empty.tar.gz"
|
||||||
|
with tarfile.open(archive, "w:gz") as tf:
|
||||||
|
data = b"nothing"
|
||||||
|
info = tarfile.TarInfo(name="README.md")
|
||||||
|
info.size = len(data)
|
||||||
|
tf.addfile(info, io.BytesIO(data))
|
||||||
|
|
||||||
|
with patch.object(Path, "cwd", return_value=project_dir):
|
||||||
|
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True)
|
||||||
|
|
||||||
|
assert result.exit_code != 0
|
||||||
|
assert "extract" in result.output.lower() or "workflow" in result.output.lower()
|
||||||
|
|
||||||
|
# -- URL archive download -----------------------------------------------
|
||||||
|
|
||||||
|
def test_workflow_add_url_tar_gz(self, project_dir):
|
||||||
|
"""workflow add downloads a .tar.gz from a URL and installs the workflow."""
|
||||||
|
import tarfile, io
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
runner, app = self._runner_and_app()
|
||||||
|
|
||||||
|
# Build an in-memory tar.gz archive containing workflow.yml.
|
||||||
|
buf = io.BytesIO()
|
||||||
|
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
|
||||||
|
data = MINIMAL_WORKFLOW_YAML.encode()
|
||||||
|
info = tarfile.TarInfo(name="workflow.yml")
|
||||||
|
info.size = len(data)
|
||||||
|
tf.addfile(info, io.BytesIO(data))
|
||||||
|
raw_bytes = buf.getvalue()
|
||||||
|
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.geturl.return_value = "https://example.com/workflow.tar.gz"
|
||||||
|
mock_resp.headers.get.return_value = "application/gzip"
|
||||||
|
mock_resp.read.return_value = raw_bytes
|
||||||
|
mock_resp.__enter__ = lambda s: s
|
||||||
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
|
with patch("urllib.request.urlopen", return_value=mock_resp), \
|
||||||
|
patch.object(Path, "cwd", return_value=project_dir):
|
||||||
|
result = runner.invoke(
|
||||||
|
app, ["workflow", "add", "https://example.com/workflow.tar.gz"],
|
||||||
|
catch_exceptions=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.exit_code == 0, result.output
|
||||||
|
assert "arc-workflow" in result.output
|
||||||
|
|
||||||
|
def test_workflow_add_url_zip(self, project_dir):
|
||||||
|
"""workflow add downloads a .zip from a URL and installs the workflow."""
|
||||||
|
import zipfile, io
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
runner, app = self._runner_and_app()
|
||||||
|
|
||||||
|
buf = io.BytesIO()
|
||||||
|
with zipfile.ZipFile(buf, "w") as zf:
|
||||||
|
zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML)
|
||||||
|
raw_bytes = buf.getvalue()
|
||||||
|
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.geturl.return_value = "https://example.com/workflow.zip"
|
||||||
|
mock_resp.headers.get.return_value = "application/zip"
|
||||||
|
mock_resp.read.return_value = raw_bytes
|
||||||
|
mock_resp.__enter__ = lambda s: s
|
||||||
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
|
with patch("urllib.request.urlopen", return_value=mock_resp), \
|
||||||
|
patch.object(Path, "cwd", return_value=project_dir):
|
||||||
|
result = runner.invoke(
|
||||||
|
app, ["workflow", "add", "https://example.com/workflow.zip"],
|
||||||
|
catch_exceptions=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.exit_code == 0, result.output
|
||||||
|
assert "arc-workflow" in result.output
|
||||||
|
|||||||
Reference in New Issue
Block a user