Compare commits

...

23 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
5d75366fd5 Validate post-redirect URL scheme before reading response body 2026-05-12 16:23:24 +00:00
Manfred Riem
7344071b7e Potential fix for pull request finding 'Variable defined multiple times'
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-05-12 09:57:04 -05:00
copilot-swe-agent[bot]
a69d427e03 Prefer final_url over original URL for archive format detection in download paths 2026-05-12 13:19:50 +00:00
copilot-swe-agent[bot]
a8320d9b61 Fix safe_extract_tarball: pass safe_members to extractall on Python 3.12+ 2026-05-11 15:28:48 +00:00
Manfred Riem
0825f508a7 Potential fix for pull request finding 'Variable defined multiple times'
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-05-08 14:11:55 -05:00
Manfred Riem
eec1291896 Potential fix for pull request finding 'Variable defined multiple times'
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-05-08 14:11:39 -05:00
Manfred Riem
7ff9c8bdd4 Potential fix for pull request finding 'Variable defined multiple times'
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-05-08 14:11:15 -05:00
copilot-swe-agent[bot]
1015ff24da Fix GNU sparse skip in safe_extract_tarball; use response.geturl() for redirect-safe format detection and HTTPS re-check
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/739d3f73-200b-417a-8a86-134329200560

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-05-07 18:53:25 +00:00
copilot-swe-agent[bot]
05798a9e70 Skip PAX/GNU metadata members in safe_extract_tarball; use standard mock imports in workflow tests
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/c1fcc1ff-8766-4d97-90a5-368447980acf

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-05-07 17:57:01 +00:00
Manfred Riem
bd04937927 Potential fix for pull request finding 'Empty except'
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-05-07 11:39:51 -05:00
Manfred Riem
1786d27c06 Address PR review: fix extractfile fallback and add OSError handling
- Fix tar.gz extractfile() None fallback in extension_update: nested-directory
  search now runs whenever manifest_data is still None, not only on KeyError
- Add OSError handling around write_bytes in preset --from URL path
- Add OSError handling around write_bytes in extension --from URL path
2026-05-07 08:17:45 -05:00
copilot-swe-agent[bot]
0a02369ebe Make detect_archive_format/safe_extract_tarball public; add workflow add archive CLI tests
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/845e41d1-75e3-49fb-a580-a7fb805dd716

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-05-06 21:50:25 +00:00
copilot-swe-agent[bot]
e0495ebc38 Fix arc_tmp_path UnboundLocalError in workflow install; add preset symlink rejection test
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/0469bac5-296a-46b6-b84e-eb33b0dc0fce

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-05-06 21:27:40 +00:00
copilot-swe-agent[bot]
cb87a410f8 Fix path traversal risk in extension URL download filename; fix redundant except clause
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/0c7ae935-443c-4e90-ba92-7c3234a46673

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-05-06 19:51:00 +00:00
copilot-swe-agent[bot]
0fd0bf6b9f Catch TarError/OSError in _safe_extract_tarball; rename zip_path to archive_path in extension_update
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/953d7f62-a75a-4690-90a9-98345cae824d

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-05-06 19:38:57 +00:00
copilot-swe-agent[bot]
d00509e770 Fix IOError messages, close tf.extractfile() handles, mention .tgz in error messages
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/891dfd6f-0f75-4522-bcd2-8a6fffb2d5f7

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-05-06 19:22:38 +00:00
Manfred Riem
c44cc245ed Address Copilot PR review: reject unknown archive formats, fix case-sensitive check
- Add explanatory comment to empty except KeyError block in _extract_workflow_yml
- Use case-insensitive extension matching for local archive detection in workflow add
- Reject unknown archive formats with clear error messages instead of silently
  defaulting to ZIP in preset add --from, extension add --from, download_extension(),
  and download_pack()
2026-05-06 07:03:57 -05:00
copilot-swe-agent[bot]
0c6cc4502c Fix type hint, add null checks for tf.extractfile() return value
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/9fb9a8ea-0967-4baf-b95c-7101e423ff58

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-04-28 18:14:52 +00:00
copilot-swe-agent[bot]
d78ead1802 Remove unnecessary import aliases, use consistent names
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/9fb9a8ea-0967-4baf-b95c-7101e423ff58

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-04-28 18:12:43 +00:00
copilot-swe-agent[bot]
b3a60f5fba Improve tarball extraction security and cleanup logic
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/9fb9a8ea-0967-4baf-b95c-7101e423ff58

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-04-28 18:09:06 +00:00
copilot-swe-agent[bot]
b37f117cf9 Address code review: fix import style and rename local aliases
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/9fb9a8ea-0967-4baf-b95c-7101e423ff58

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-04-28 18:06:49 +00:00
copilot-swe-agent[bot]
a434e5a8ed Add .tar.gz/.tgz archive support for extension, preset, and workflow installation
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/9fb9a8ea-0967-4baf-b95c-7101e423ff58

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
2026-04-28 18:04:33 +00:00
copilot-swe-agent[bot]
1bda2f0cb4 Initial plan 2026-04-28 17:49:10 +00:00
6 changed files with 999 additions and 100 deletions

View File

@@ -2576,7 +2576,7 @@ def preset_list():
@preset_app.command("add") @preset_app.command("add")
def preset_add( def preset_add(
preset_id: str = typer.Argument(None, help="Preset ID to install from catalog"), preset_id: str = typer.Argument(None, help="Preset ID to install from catalog"),
from_url: str = typer.Option(None, "--from", help="Install from a URL (ZIP file)"), from_url: str = typer.Option(None, "--from", help="Install from a URL (ZIP or .tar.gz/.tgz archive)"),
dev: str = typer.Option(None, "--dev", help="Install from local directory (development mode)"), dev: str = typer.Option(None, "--dev", help="Install from local directory (development mode)"),
priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"), priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"),
): ):
@@ -2629,17 +2629,46 @@ def preset_add(
import urllib.request import urllib.request
import urllib.error import urllib.error
import tempfile import tempfile
from .extensions import detect_archive_format as _det_fmt
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
zip_path = Path(tmpdir) / "preset.zip" final_url = from_url
archive_fmt = ""
try: try:
with urllib.request.urlopen(from_url, timeout=60) as response: with urllib.request.urlopen(from_url, timeout=60) as response:
zip_path.write_bytes(response.read()) final_url = response.geturl()
# Re-validate scheme after any redirect (scheme-downgrade
# guard). Check BEFORE reading the body so an insecure
# redirect cannot cause us to fetch the payload.
_fp = _urlparse(final_url)
_fl = _fp.hostname in ("localhost", "127.0.0.1", "::1")
if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl):
console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}")
raise typer.Exit(1)
content_type = response.headers.get("Content-Type", "")
# Prefer the post-redirect URL for format detection;
# fall back to the original URL only as a last hint.
archive_fmt = _det_fmt(final_url, content_type)
if not archive_fmt:
archive_fmt = _det_fmt(from_url)
archive_data = response.read()
except urllib.error.URLError as e: except urllib.error.URLError as e:
console.print(f"[red]Error:[/red] Failed to download: {e}") console.print(f"[red]Error:[/red] Failed to download: {e}")
raise typer.Exit(1) raise typer.Exit(1)
manifest = manager.install_from_zip(zip_path, speckit_version, priority) if not archive_fmt:
console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.")
console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.")
raise typer.Exit(1)
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
archive_path = Path(tmpdir) / f"preset{suffix}"
try:
archive_path.write_bytes(archive_data)
manifest = manager.install_from_zip(archive_path, speckit_version, priority)
except OSError as e:
console.print(f"[red]Error:[/red] Failed to save or install archive: {e}")
raise typer.Exit(1)
console.print(f"[green]✓[/green] Preset '{manifest.name}' v{manifest.version} installed (priority {priority})") console.print(f"[green]✓[/green] Preset '{manifest.name}' v{manifest.version} installed (priority {priority})")
@@ -3573,7 +3602,7 @@ def catalog_remove(
def extension_add( def extension_add(
extension: str = typer.Argument(help="Extension name or path"), extension: str = typer.Argument(help="Extension name or path"),
dev: bool = typer.Option(False, "--dev", help="Install from local directory"), dev: bool = typer.Option(False, "--dev", help="Install from local directory"),
from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL"), from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL (ZIP or .tar.gz/.tgz archive)"),
priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"), priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"),
): ):
"""Install an extension.""" """Install an extension."""
@@ -3612,10 +3641,11 @@ def extension_add(
manifest = manager.install_from_directory(source_path, speckit_version, priority=priority) manifest = manager.install_from_directory(source_path, speckit_version, priority=priority)
elif from_url: elif from_url:
# Install from URL (ZIP file) # Install from URL (ZIP or tar.gz archive)
import urllib.request import urllib.request
import urllib.error import urllib.error
from urllib.parse import urlparse from urllib.parse import urlparse
from .extensions import detect_archive_format
# Validate URL # Validate URL
parsed = urlparse(from_url) parsed = urlparse(from_url)
@@ -3631,25 +3661,53 @@ def extension_add(
console.print("Only install extensions from sources you trust.\n") console.print("Only install extensions from sources you trust.\n")
console.print(f"Downloading from {from_url}...") console.print(f"Downloading from {from_url}...")
# Download ZIP to temp location # Download archive to temp location; detect format from the
# post-redirect URL (with Content-Type fallback), only using
# the original URL as a last hint.
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads" download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
download_dir.mkdir(parents=True, exist_ok=True) download_dir.mkdir(parents=True, exist_ok=True)
zip_path = download_dir / f"{extension}-url-download.zip" archive_fmt = ""
archive_path = None
try: try:
with urllib.request.urlopen(from_url, timeout=60) as response: with urllib.request.urlopen(from_url, timeout=60) as response:
zip_data = response.read() final_url = response.geturl()
zip_path.write_bytes(zip_data) # Re-validate scheme after any redirect (scheme-downgrade
# guard). Check BEFORE reading the body so an insecure
# redirect cannot cause us to fetch the payload.
_fp = urlparse(final_url)
_fl = _fp.hostname in ("localhost", "127.0.0.1", "::1")
if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl):
console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}")
raise typer.Exit(1)
content_type = response.headers.get("Content-Type", "")
archive_fmt = detect_archive_format(final_url, content_type)
if not archive_fmt:
archive_fmt = detect_archive_format(from_url)
archive_data = response.read()
# Install from downloaded ZIP if not archive_fmt:
manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority) console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.")
console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.")
raise typer.Exit(1)
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
safe_name = Path(extension).name or "extension"
archive_path = download_dir / f"{safe_name}-url-download{suffix}"
archive_path.write_bytes(archive_data)
# Install from downloaded archive
manifest = manager.install_from_zip(archive_path, speckit_version, priority=priority)
except urllib.error.URLError as e: except urllib.error.URLError as e:
console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}") console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}")
raise typer.Exit(1) raise typer.Exit(1)
except OSError as e:
console.print(f"[red]Error:[/red] Failed to save or install archive: {e}")
raise typer.Exit(1)
finally: finally:
# Clean up downloaded ZIP # Clean up the downloaded archive
if zip_path.exists(): if archive_path is not None and archive_path.exists():
zip_path.unlink() archive_path.unlink()
else: else:
# Try bundled extensions first (shipped with spec-kit) # Try bundled extensions first (shipped with spec-kit)
@@ -4301,29 +4359,55 @@ def extension_update(
backup_hooks[hook_name] = ext_hooks backup_hooks[hook_name] = ext_hooks
# 5. Download new version # 5. Download new version
zip_path = catalog.download_extension(extension_id) archive_path = catalog.download_extension(extension_id)
try: try:
# 6. Validate extension ID from ZIP BEFORE modifying installation # 6. Validate extension ID from archive BEFORE modifying installation
# Handle both root-level and nested extension.yml (GitHub auto-generated ZIPs) # Handle both root-level and nested extension.yml (GitHub auto-generated archives)
with zipfile.ZipFile(zip_path, "r") as zf: from .extensions import detect_archive_format
import yaml import tarfile
manifest_data = None archive_fmt = detect_archive_format(str(archive_path))
namelist = zf.namelist() import yaml
manifest_data = None
# First try root-level extension.yml if archive_fmt == "tar.gz":
if "extension.yml" in namelist: with tarfile.open(archive_path, "r:gz") as tf:
with zf.open("extension.yml") as f: # First try root-level extension.yml
manifest_data = yaml.safe_load(f) or {} try:
else: m = tf.getmember("extension.yml")
# Look for extension.yml in a single top-level subdirectory f = tf.extractfile(m)
# (e.g., "repo-name-branch/extension.yml") if f is not None:
manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1] with f:
if len(manifest_paths) == 1: manifest_data = yaml.safe_load(f.read()) or {}
with zf.open(manifest_paths[0]) as f: except KeyError:
# extension.yml not present at archive root; use nested fallback below.
manifest_data = None
# Fall back to nested-directory search if root-level
# was missing (KeyError) or not a regular file (None).
if manifest_data is None:
members = [m for m in tf.getmembers() if m.name.endswith("/extension.yml") and m.name.count("/") == 1]
if len(members) == 1:
f = tf.extractfile(members[0])
if f is not None:
with f:
manifest_data = yaml.safe_load(f.read()) or {}
else:
with zipfile.ZipFile(archive_path, "r") as zf:
namelist = zf.namelist()
# First try root-level extension.yml
if "extension.yml" in namelist:
with zf.open("extension.yml") as f:
manifest_data = yaml.safe_load(f) or {} manifest_data = yaml.safe_load(f) or {}
else:
# Look for extension.yml in a single top-level subdirectory
# (e.g., "repo-name-branch/extension.yml")
manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1]
if len(manifest_paths) == 1:
with zf.open(manifest_paths[0]) as f:
manifest_data = yaml.safe_load(f) or {}
if manifest_data is None: if manifest_data is None:
raise ValueError("Downloaded extension archive is missing 'extension.yml'") raise ValueError("Downloaded extension archive is missing 'extension.yml'")
zip_extension_id = manifest_data.get("extension", {}).get("id") zip_extension_id = manifest_data.get("extension", {}).get("id")
if zip_extension_id != extension_id: if zip_extension_id != extension_id:
@@ -4335,7 +4419,7 @@ def extension_update(
manager.remove(extension_id, keep_config=True) manager.remove(extension_id, keep_config=True)
# 8. Install new version # 8. Install new version
_ = manager.install_from_zip(zip_path, speckit_version) _ = manager.install_from_zip(archive_path, speckit_version)
# Restore user config files from backup after successful install. # Restore user config files from backup after successful install.
new_extension_dir = manager.extensions_dir / extension_id new_extension_dir = manager.extensions_dir / extension_id
@@ -4381,9 +4465,9 @@ def extension_update(
hook["enabled"] = False hook["enabled"] = False
hook_executor.save_project_config(config) hook_executor.save_project_config(config)
finally: finally:
# Clean up downloaded ZIP # Clean up downloaded archive
if zip_path.exists(): if archive_path.exists():
zip_path.unlink() archive_path.unlink()
# 10. Clean up backup on success # 10. Clean up backup on success
if backup_base.exists(): if backup_base.exists():
@@ -4875,6 +4959,59 @@ def workflow_list():
console.print() console.print()
def _extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes:
"""Extract ``workflow.yml`` from a ZIP or ``.tar.gz`` archive.
Searches the archive root and a single nested top-level subdirectory
(e.g., ``repo-name-1.0/workflow.yml``).
Args:
archive_path: Path to the downloaded archive.
archive_fmt: ``"zip"`` or ``"tar.gz"``.
Returns:
Raw bytes of the ``workflow.yml`` file.
Raises:
ValueError: If no ``workflow.yml`` is found in the archive.
"""
import tarfile
if archive_fmt == "tar.gz":
with tarfile.open(archive_path, "r:gz") as tf:
# Try root-level first.
try:
f = tf.extractfile(tf.getmember("workflow.yml"))
if f is not None:
with f:
return f.read()
except KeyError:
pass # Root-level workflow.yml not found; fall through to subdirectory search below.
# Look in a single top-level subdirectory.
candidates = [
m for m in tf.getmembers()
if m.name.endswith("/workflow.yml") and m.name.count("/") == 1
]
if len(candidates) == 1:
f = tf.extractfile(candidates[0])
if f is not None:
with f:
return f.read()
else:
with zipfile.ZipFile(archive_path, "r") as zf:
namelist = zf.namelist()
if "workflow.yml" in namelist:
return zf.read("workflow.yml")
candidates = [
n for n in namelist
if n.endswith("/workflow.yml") and n.count("/") == 1
]
if len(candidates) == 1:
return zf.read(candidates[0])
raise ValueError("No workflow.yml found in the downloaded archive")
@workflow_app.command("add") @workflow_app.command("add")
def workflow_add( def workflow_add(
source: str = typer.Argument(..., help="Workflow ID, URL, or local path"), source: str = typer.Argument(..., help="Workflow ID, URL, or local path"),
@@ -4928,6 +5065,7 @@ def workflow_add(
from ipaddress import ip_address from ipaddress import ip_address
from urllib.parse import urlparse from urllib.parse import urlparse
from urllib.request import urlopen # noqa: S310 from urllib.request import urlopen # noqa: S310
from .extensions import detect_archive_format
parsed_src = urlparse(source) parsed_src = urlparse(source)
src_host = parsed_src.hostname or "" src_host = parsed_src.hostname or ""
@@ -4958,18 +5096,53 @@ def workflow_add(
if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb): if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb):
console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}") console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}")
raise typer.Exit(1) raise typer.Exit(1)
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
tmp.write(resp.read()) # Detect archive format from the final URL or Content-Type header.
tmp_path = Path(tmp.name) archive_fmt = detect_archive_format(final_url)
if not archive_fmt:
content_type = resp.headers.get("Content-Type", "")
archive_fmt = detect_archive_format(final_url, content_type)
raw_data = resp.read()
except typer.Exit: except typer.Exit:
raise raise
except Exception as exc: except Exception as exc:
console.print(f"[red]Error:[/red] Failed to download workflow: {exc}") console.print(f"[red]Error:[/red] Failed to download workflow: {exc}")
raise typer.Exit(1) raise typer.Exit(1)
tmp_path = None
try:
if archive_fmt in ("tar.gz", "zip"):
# Extract workflow.yml from the archive.
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
arc_tmp_path = None
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_tmp:
arc_tmp_path = Path(arc_tmp.name)
arc_tmp.write(raw_data)
try:
wf_yaml = _extract_workflow_yml(arc_tmp_path, archive_fmt)
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
tmp_path = Path(tmp.name)
tmp.write(wf_yaml)
finally:
if arc_tmp_path is not None:
arc_tmp_path.unlink(missing_ok=True)
else:
# Treat as a plain YAML file (existing behaviour).
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
tmp.write(raw_data)
tmp_path = Path(tmp.name)
except typer.Exit:
raise
except Exception as exc:
console.print(f"[red]Error:[/red] Failed to process downloaded workflow: {exc}")
raise typer.Exit(1)
try: try:
_validate_and_install_local(tmp_path, source) _validate_and_install_local(tmp_path, source)
finally: finally:
tmp_path.unlink(missing_ok=True) if tmp_path is not None:
tmp_path.unlink(missing_ok=True)
return return
# Try as a local file/directory # Try as a local file/directory
@@ -4978,6 +5151,27 @@ def workflow_add(
if source_path.is_file() and source_path.suffix in (".yml", ".yaml"): if source_path.is_file() and source_path.suffix in (".yml", ".yaml"):
_validate_and_install_local(source_path, str(source_path)) _validate_and_install_local(source_path, str(source_path))
return return
elif source_path.is_file() and (
source.lower().endswith(".tar.gz") or source.lower().endswith(".tgz") or source.lower().endswith(".zip")
):
# Local archive file containing workflow.yml
from .extensions import detect_archive_format
local_fmt = detect_archive_format(source)
try:
wf_yaml = _extract_workflow_yml(source_path, local_fmt)
except Exception as exc:
console.print(f"[red]Error:[/red] Failed to extract workflow from archive: {exc}")
raise typer.Exit(1)
import tempfile
tmp_local = None
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
tmp_local = Path(tmp.name)
tmp.write(wf_yaml)
try:
_validate_and_install_local(tmp_local, str(source_path))
finally:
tmp_local.unlink(missing_ok=True)
return
elif source_path.is_dir(): elif source_path.is_dir():
wf_file = source_path / "workflow.yml" wf_file = source_path / "workflow.yml"
if not wf_file.exists(): if not wf_file.exists():
@@ -5041,6 +5235,7 @@ def workflow_add(
try: try:
from urllib.request import urlopen # noqa: S310 — URL comes from catalog from urllib.request import urlopen # noqa: S310 — URL comes from catalog
from .extensions import detect_archive_format
workflow_dir.mkdir(parents=True, exist_ok=True) workflow_dir.mkdir(parents=True, exist_ok=True)
with urlopen(workflow_url, timeout=30) as response: # noqa: S310 with urlopen(workflow_url, timeout=30) as response: # noqa: S310
@@ -5063,7 +5258,32 @@ def workflow_add(
f"[red]Error:[/red] Workflow '{source}' redirected to non-HTTPS URL: {final_url}" f"[red]Error:[/red] Workflow '{source}' redirected to non-HTTPS URL: {final_url}"
) )
raise typer.Exit(1) raise typer.Exit(1)
workflow_file.write_bytes(response.read())
# Detect archive format from the final URL or Content-Type header.
cat_archive_fmt = detect_archive_format(final_url)
if not cat_archive_fmt:
cat_ct = response.headers.get("Content-Type", "")
cat_archive_fmt = detect_archive_format(final_url, cat_ct)
raw_response = response.read()
if cat_archive_fmt in ("tar.gz", "zip"):
# Download URL points to an archive — extract workflow.yml from it.
suffix = ".tar.gz" if cat_archive_fmt == "tar.gz" else ".zip"
arc_tmp = None
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_f:
arc_tmp = Path(arc_f.name)
arc_f.write(raw_response)
try:
wf_yaml_bytes = _extract_workflow_yml(arc_tmp, cat_archive_fmt)
finally:
if arc_tmp is not None:
arc_tmp.unlink(missing_ok=True)
workflow_file.write_bytes(wf_yaml_bytes)
else:
workflow_file.write_bytes(raw_response)
except typer.Exit:
raise
except Exception as exc: except Exception as exc:
if workflow_dir.exists(): if workflow_dir.exists():
import shutil import shutil

View File

@@ -9,6 +9,8 @@ without bloating the core framework.
import json import json
import hashlib import hashlib
import os import os
import sys
import tarfile
import tempfile import tempfile
import zipfile import zipfile
import shutil import shutil
@@ -106,6 +108,137 @@ def normalize_priority(value: Any, default: int = 10) -> int:
return priority if priority >= 1 else default return priority if priority >= 1 else default
def detect_archive_format(url: str, content_type: str = "") -> str:
"""Detect archive format from URL path extension or Content-Type header.
Args:
url: URL or file path to inspect.
content_type: Optional ``Content-Type`` header value from the HTTP response.
Returns:
``"zip"`` for ZIP archives, ``"tar.gz"`` for gzipped tarballs, or ``""``
when the format cannot be determined.
"""
# Strip query-string / fragment before examining the path extension.
url_path = url.split("?")[0].split("#")[0].lower()
if url_path.endswith(".zip"):
return "zip"
if url_path.endswith(".tar.gz") or url_path.endswith(".tgz"):
return "tar.gz"
# Fall back to Content-Type header inspection.
ct = content_type.lower()
if "application/zip" in ct or "application/x-zip" in ct:
return "zip"
if any(
t in ct
for t in (
"application/gzip",
"application/x-gzip",
"application/x-tar+gzip",
)
):
return "tar.gz"
return ""
def safe_extract_tarball(
archive_path: Path,
dest_dir: Path,
error_class: "type[Exception]" = Exception,
) -> None:
"""Safely extract a ``.tar.gz`` or ``.tgz`` archive into *dest_dir*.
All members are validated before extraction to prevent *tar slip*
(path traversal) attacks. Symlinks, hard links, and special files
(devices, FIFOs, etc.) are rejected.
On Python 3.12 and later the ``"data"`` extraction filter is applied
for an additional layer of OS-level protection. On earlier versions
the explicit member list (containing only pre-validated regular files
and directories) is passed to ``extractall()`` — since all symlinks are
already rejected in the validation phase, no archive-introduced symlink
can be followed during extraction.
Args:
archive_path: Path to the ``.tar.gz``/``.tgz`` archive.
dest_dir: Destination directory (must already exist).
error_class: Exception class to raise on unsafe entries.
Raises:
error_class: If any member is unsafe or the archive cannot be read.
"""
dest_resolved = dest_dir.resolve()
# Tar metadata member types to skip during validation — they carry no
# extractable payload and are generated automatically by many common
# archiving tools (e.g. PAX headers, GNU longname/longlink entries).
# GNUTYPE_SPARSE is intentionally excluded: it carries a real file payload
# and tarfile.TarInfo.isreg() returns True for it, so it passes the
# regular-file check below and is extracted correctly.
_TAR_METADATA_TYPES = (
tarfile.XHDTYPE, # PAX extended header
tarfile.XGLTYPE, # PAX global extended header
tarfile.SOLARIS_XHDTYPE, # Solaris PAX extended header
tarfile.GNUTYPE_LONGNAME, # GNU long path name (metadata only)
tarfile.GNUTYPE_LONGLINK, # GNU long link name (metadata only)
)
try:
with tarfile.open(archive_path, "r:gz") as tf:
members = tf.getmembers()
safe_members = []
# Validate every member before extracting anything.
for member in members:
# Reject absolute paths and any path component that is "..".
if os.path.isabs(member.name) or any(
part == ".." for part in member.name.replace("\\", "/").split("/")
):
raise error_class(
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
)
# Confirm the resolved path stays inside dest_dir.
member_path = (dest_dir / member.name).resolve()
try:
member_path.relative_to(dest_resolved)
except ValueError:
raise error_class(
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
)
# Skip tar metadata members — they carry no extractable payload.
if member.type in _TAR_METADATA_TYPES:
continue
# Reject symlinks and hard links.
if member.issym() or member.islnk():
raise error_class(
f"Symlinks are not allowed in archive: {member.name}"
)
# Reject devices, FIFOs and other special file types.
if not (member.isreg() or member.isdir()):
raise error_class(
f"Non-regular file in archive: {member.name}"
)
safe_members.append(member)
# Extract — use the "data" filter on Python 3.12+ for extra hardening.
# On all versions pass only the pre-validated members so that no
# unvetted entry (added concurrently or via a race) slips through.
if sys.version_info >= (3, 12):
tf.extractall(dest_dir, members=safe_members, filter="data") # type: ignore[call-arg]
else:
tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above
except error_class:
raise
except (tarfile.TarError, OSError) as e:
raise error_class(f"Failed to read archive {archive_path}: {e}") from e
@dataclass @dataclass
class CatalogEntry: class CatalogEntry:
"""Represents a single catalog entry in the catalog stack.""" """Represents a single catalog entry in the catalog stack."""
@@ -1202,10 +1335,10 @@ class ExtensionManager:
speckit_version: str, speckit_version: str,
priority: int = 10, priority: int = 10,
) -> ExtensionManifest: ) -> ExtensionManifest:
"""Install extension from ZIP file. """Install extension from a ZIP or ``.tar.gz``/``.tgz`` archive.
Args: Args:
zip_path: Path to extension ZIP file zip_path: Path to the extension archive (ZIP or gzipped tarball).
speckit_version: Current spec-kit version speckit_version: Current spec-kit version
priority: Resolution priority (lower = higher precedence, default 10) priority: Resolution priority (lower = higher precedence, default 10)
@@ -1213,7 +1346,8 @@ class ExtensionManager:
Installed extension manifest Installed extension manifest
Raises: Raises:
ValidationError: If manifest is invalid or priority is invalid ValidationError: If manifest is invalid, the archive is unsafe, or
priority is invalid
CompatibilityError: If extension is incompatible CompatibilityError: If extension is incompatible
""" """
# Validate priority early # Validate priority early
@@ -1223,21 +1357,27 @@ class ExtensionManager:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
temp_path = Path(tmpdir) temp_path = Path(tmpdir)
# Extract ZIP safely (prevent Zip Slip attack) archive_fmt = detect_archive_format(str(zip_path))
with zipfile.ZipFile(zip_path, 'r') as zf:
# Validate all paths first before extracting anything if archive_fmt == "tar.gz":
temp_path_resolved = temp_path.resolve() # Extract tarball safely (prevent tar slip attack)
for member in zf.namelist(): safe_extract_tarball(zip_path, temp_path, ValidationError)
member_path = (temp_path / member).resolve() else:
# Use is_relative_to for safe path containment check # Extract ZIP safely (prevent Zip Slip attack)
try: with zipfile.ZipFile(zip_path, 'r') as zf:
member_path.relative_to(temp_path_resolved) # Validate all paths first before extracting anything
except ValueError: temp_path_resolved = temp_path.resolve()
raise ValidationError( for member in zf.namelist():
f"Unsafe path in ZIP archive: {member} (potential path traversal)" member_path = (temp_path / member).resolve()
) # Use is_relative_to for safe path containment check
# Only extract after all paths are validated try:
zf.extractall(temp_path) member_path.relative_to(temp_path_resolved)
except ValueError:
raise ValidationError(
f"Unsafe path in ZIP archive: {member} (potential path traversal)"
)
# Only extract after all paths are validated
zf.extractall(temp_path)
# Find extension directory (may be nested) # Find extension directory (may be nested)
extension_dir = temp_path extension_dir = temp_path
@@ -1251,7 +1391,7 @@ class ExtensionManager:
manifest_path = extension_dir / "extension.yml" manifest_path = extension_dir / "extension.yml"
if not manifest_path.exists(): if not manifest_path.exists():
raise ValidationError("No extension.yml found in ZIP file") raise ValidationError("No extension.yml found in archive")
# Install from extracted directory # Install from extracted directory
return self.install_from_directory(extension_dir, speckit_version, priority=priority) return self.install_from_directory(extension_dir, speckit_version, priority=priority)
@@ -1965,14 +2105,18 @@ class ExtensionCatalog:
return None return None
def download_extension(self, extension_id: str, target_dir: Optional[Path] = None) -> Path: def download_extension(self, extension_id: str, target_dir: Optional[Path] = None) -> Path:
"""Download extension ZIP from catalog. """Download extension archive from catalog.
Supports both ZIP (``.zip``) and gzipped tarball (``.tar.gz``/``.tgz``)
archives. The format is detected from the download URL's path extension;
when ambiguous the ``Content-Type`` header is used as a fallback.
Args: Args:
extension_id: ID of the extension to download extension_id: ID of the extension to download
target_dir: Directory to save ZIP file (defaults to temp directory) target_dir: Directory to save the archive (defaults to cache directory)
Returns: Returns:
Path to downloaded ZIP file Path to downloaded archive file
Raises: Raises:
ExtensionError: If extension not found or download fails ExtensionError: If extension not found or download fails
@@ -2011,21 +2155,60 @@ class ExtensionCatalog:
target_dir.mkdir(parents=True, exist_ok=True) target_dir.mkdir(parents=True, exist_ok=True)
version = ext_info.get("version", "unknown") version = ext_info.get("version", "unknown")
zip_filename = f"{extension_id}-{version}.zip"
zip_path = target_dir / zip_filename
# Download the ZIP file # Download the archive. Determine the archive format from the
# post-redirect URL first (with Content-Type fallback); only use the
# original `download_url` as a last hint if the final URL gives no
# signal.
final_url = download_url
archive_fmt = ""
try: try:
with self._open_url(download_url, timeout=60) as response: with self._open_url(download_url, timeout=60) as response:
zip_data = response.read() final_url = response.geturl()
# Re-validate scheme after any redirect to guard against
zip_path.write_bytes(zip_data) # scheme-downgrade. Validate BEFORE reading the body so a
return zip_path # malicious redirect cannot cause us to fetch the payload
# over an insecure scheme.
_final_parsed = urlparse(final_url)
_final_is_localhost = _final_parsed.hostname in (
"localhost",
"127.0.0.1",
"::1",
)
if _final_parsed.scheme != "https" and not (
_final_parsed.scheme == "http" and _final_is_localhost
):
raise ExtensionError(
f"Extension download URL was redirected to a non-HTTPS URL: {final_url}"
)
content_type = response.headers.get("Content-Type", "")
archive_fmt = detect_archive_format(final_url, content_type)
if not archive_fmt:
archive_fmt = detect_archive_format(download_url)
archive_data = response.read()
except urllib.error.URLError as e: except urllib.error.URLError as e:
raise ExtensionError(f"Failed to download extension from {download_url}: {e}") raise ExtensionError(f"Failed to download extension from {download_url}: {e}")
except IOError as e: except IOError as e:
raise ExtensionError(f"Failed to save extension ZIP: {e}") raise ExtensionError(f"Failed to read extension archive from {download_url}: {e}")
# Choose file extension based on detected format.
if not archive_fmt:
raise ExtensionError(
f"Could not determine archive format for {download_url}. "
"Ensure the URL points to a .zip or .tar.gz/.tgz file."
)
if archive_fmt == "tar.gz":
archive_filename = f"{extension_id}-{version}.tar.gz"
else:
archive_filename = f"{extension_id}-{version}.zip"
archive_path = target_dir / archive_filename
try:
archive_path.write_bytes(archive_data)
except IOError as e:
raise ExtensionError(f"Failed to save extension archive: {e}")
return archive_path
def clear_cache(self): def clear_cache(self):
"""Clear the catalog cache (both legacy and URL-hash-based files).""" """Clear the catalog cache (both legacy and URL-hash-based files)."""

View File

@@ -27,7 +27,7 @@ import yaml
from packaging import version as pkg_version from packaging import version as pkg_version
from packaging.specifiers import SpecifierSet, InvalidSpecifier from packaging.specifiers import SpecifierSet, InvalidSpecifier
from .extensions import ExtensionRegistry, normalize_priority from .extensions import ExtensionRegistry, normalize_priority, detect_archive_format, safe_extract_tarball
def _substitute_core_template( def _substitute_core_template(
@@ -1604,10 +1604,10 @@ class PresetManager:
speckit_version: str, speckit_version: str,
priority: int = 10, priority: int = 10,
) -> PresetManifest: ) -> PresetManifest:
"""Install preset from ZIP file. """Install preset from a ZIP or ``.tar.gz``/``.tgz`` archive.
Args: Args:
zip_path: Path to preset ZIP file zip_path: Path to the preset archive (ZIP or gzipped tarball).
speckit_version: Current spec-kit version speckit_version: Current spec-kit version
priority: Resolution priority (lower = higher precedence, default 10) priority: Resolution priority (lower = higher precedence, default 10)
@@ -1615,7 +1615,8 @@ class PresetManager:
Installed preset manifest Installed preset manifest
Raises: Raises:
PresetValidationError: If manifest is invalid or priority is invalid PresetValidationError: If manifest is invalid, the archive is unsafe,
or priority is invalid
PresetCompatibilityError: If pack is incompatible PresetCompatibilityError: If pack is incompatible
""" """
# Validate priority early # Validate priority early
@@ -1625,18 +1626,24 @@ class PresetManager:
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
temp_path = Path(tmpdir) temp_path = Path(tmpdir)
with zipfile.ZipFile(zip_path, 'r') as zf: archive_fmt = detect_archive_format(str(zip_path))
temp_path_resolved = temp_path.resolve()
for member in zf.namelist(): if archive_fmt == "tar.gz":
member_path = (temp_path / member).resolve() # Extract tarball safely (prevent tar slip attack)
try: safe_extract_tarball(zip_path, temp_path, PresetValidationError)
member_path.relative_to(temp_path_resolved) else:
except ValueError: with zipfile.ZipFile(zip_path, 'r') as zf:
raise PresetValidationError( temp_path_resolved = temp_path.resolve()
f"Unsafe path in ZIP archive: {member} " for member in zf.namelist():
"(potential path traversal)" member_path = (temp_path / member).resolve()
) try:
zf.extractall(temp_path) member_path.relative_to(temp_path_resolved)
except ValueError:
raise PresetValidationError(
f"Unsafe path in ZIP archive: {member} "
"(potential path traversal)"
)
zf.extractall(temp_path)
pack_dir = temp_path pack_dir = temp_path
manifest_path = pack_dir / "preset.yml" manifest_path = pack_dir / "preset.yml"
@@ -1649,7 +1656,7 @@ class PresetManager:
if not manifest_path.exists(): if not manifest_path.exists():
raise PresetValidationError( raise PresetValidationError(
"No preset.yml found in ZIP file" "No preset.yml found in archive"
) )
return self.install_from_directory(pack_dir, speckit_version, priority) return self.install_from_directory(pack_dir, speckit_version, priority)
@@ -2242,14 +2249,18 @@ class PresetCatalog:
def download_pack( def download_pack(
self, pack_id: str, target_dir: Optional[Path] = None self, pack_id: str, target_dir: Optional[Path] = None
) -> Path: ) -> Path:
"""Download preset ZIP from catalog. """Download preset archive from catalog.
Supports both ZIP (``.zip``) and gzipped tarball (``.tar.gz``/``.tgz``)
archives. The format is detected from the download URL's path extension;
when ambiguous the ``Content-Type`` header is used as a fallback.
Args: Args:
pack_id: ID of the preset to download pack_id: ID of the preset to download
target_dir: Directory to save ZIP file (defaults to cache directory) target_dir: Directory to save the archive (defaults to cache directory)
Returns: Returns:
Path to downloaded ZIP file Path to downloaded archive file
Raises: Raises:
PresetError: If pack not found or download fails PresetError: If pack not found or download fails
@@ -2301,22 +2312,61 @@ class PresetCatalog:
target_dir.mkdir(parents=True, exist_ok=True) target_dir.mkdir(parents=True, exist_ok=True)
version = pack_info.get("version", "unknown") version = pack_info.get("version", "unknown")
zip_filename = f"{pack_id}-{version}.zip"
zip_path = target_dir / zip_filename
# Determine the archive format from the post-redirect URL first
# (with Content-Type fallback); only use the original `download_url`
# as a last hint if the final URL gives no signal.
final_url = download_url
archive_fmt = ""
try: try:
with self._open_url(download_url, timeout=60) as response: with self._open_url(download_url, timeout=60) as response:
zip_data = response.read() final_url = response.geturl()
# Re-validate scheme after any redirect to guard against
zip_path.write_bytes(zip_data) # scheme-downgrade. Validate BEFORE reading the body so a
return zip_path # malicious redirect cannot cause us to fetch the payload
# over an insecure scheme.
_final_parsed = urlparse(final_url)
_final_is_localhost = _final_parsed.hostname in (
"localhost",
"127.0.0.1",
"::1",
)
if _final_parsed.scheme != "https" and not (
_final_parsed.scheme == "http" and _final_is_localhost
):
raise PresetError(
f"Preset download URL was redirected to a non-HTTPS URL: {final_url}"
)
content_type = response.headers.get("Content-Type", "")
archive_fmt = detect_archive_format(final_url, content_type)
if not archive_fmt:
archive_fmt = detect_archive_format(download_url)
archive_data = response.read()
except urllib.error.URLError as e: except urllib.error.URLError as e:
raise PresetError( raise PresetError(
f"Failed to download preset from {download_url}: {e}" f"Failed to download preset from {download_url}: {e}"
) )
except IOError as e: except IOError as e:
raise PresetError(f"Failed to save preset ZIP: {e}") raise PresetError(f"Failed to read preset archive from {download_url}: {e}")
# Choose file extension based on detected format.
if not archive_fmt:
raise PresetError(
f"Could not determine archive format for {download_url}. "
"Ensure the URL points to a .zip or .tar.gz/.tgz file."
)
if archive_fmt == "tar.gz":
archive_filename = f"{pack_id}-{version}.tar.gz"
else:
archive_filename = f"{pack_id}-{version}.zip"
archive_path = target_dir / archive_filename
try:
archive_path.write_bytes(archive_data)
except IOError as e:
raise PresetError(f"Failed to save preset archive: {e}")
return archive_path
def clear_cache(self): def clear_cache(self):
"""Clear all catalog cache files, including per-URL hashed caches.""" """Clear all catalog cache files, including per-URL hashed caches."""

View File

@@ -178,6 +178,47 @@ class TestNormalizePriority:
assert normalize_priority("invalid", default=1) == 1 assert normalize_priority("invalid", default=1) == 1
# ===== detect_archive_format Tests =====
class TestDetectArchiveFormat:
"""Test the detect_archive_format helper."""
def _fmt(self, url, ct=""):
from specify_cli.extensions import detect_archive_format
return detect_archive_format(url, ct)
def test_zip_url_extension(self):
assert self._fmt("https://example.com/ext-1.0.0.zip") == "zip"
def test_tar_gz_url_extension(self):
assert self._fmt("https://example.com/ext-1.0.0.tar.gz") == "tar.gz"
def test_tgz_url_extension(self):
assert self._fmt("https://example.com/ext-1.0.0.tgz") == "tar.gz"
def test_zip_uppercase_url_extension(self):
assert self._fmt("https://example.com/ext.ZIP") == "zip"
def test_tar_gz_with_query_string(self):
assert self._fmt("https://example.com/ext.tar.gz?token=abc") == "tar.gz"
def test_zip_content_type_fallback(self):
assert self._fmt("https://example.com/download", "application/zip") == "zip"
def test_gzip_content_type_fallback(self):
assert self._fmt("https://example.com/download", "application/gzip") == "tar.gz"
def test_x_gzip_content_type_fallback(self):
assert self._fmt("https://example.com/download", "application/x-gzip") == "tar.gz"
def test_unknown_returns_empty_string(self):
assert self._fmt("https://example.com/workflow.yml") == ""
def test_url_extension_takes_precedence_over_content_type(self):
# URL says .zip — content-type claiming gzip should not override.
assert self._fmt("https://example.com/ext.zip", "application/gzip") == "zip"
# ===== ExtensionManifest Tests ===== # ===== ExtensionManifest Tests =====
class TestExtensionManifest: class TestExtensionManifest:
@@ -1013,6 +1054,97 @@ class TestExtensionManager:
assert backup_file.read_text() == "test: config" assert backup_file.read_text() == "test: config"
# ===== install_from_zip Tarball Tests =====
class TestInstallFromTarball:
"""Tests for install_from_zip accepting .tar.gz/.tgz archives."""
def _make_tarball(self, dest: Path, extension_dir: Path, nested: bool = False) -> None:
"""Create a minimal .tar.gz archive from *extension_dir*."""
import tarfile
with tarfile.open(dest, "w:gz") as tf:
for file_path in extension_dir.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(extension_dir)
if nested:
arcname = Path("test-ext-v1.0.0") / arcname
tf.add(file_path, arcname=str(arcname))
def test_install_from_tar_gz(self, extension_dir, project_dir, temp_dir):
"""install_from_zip should accept a .tar.gz archive."""
archive = temp_dir / "test-ext-1.0.0.tar.gz"
self._make_tarball(archive, extension_dir)
manager = ExtensionManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.0")
assert manifest.id == "test-ext"
assert manager.registry.is_installed("test-ext")
def test_install_from_tgz(self, extension_dir, project_dir, temp_dir):
"""install_from_zip should accept a .tgz archive."""
archive = temp_dir / "test-ext-1.0.0.tgz"
self._make_tarball(archive, extension_dir)
manager = ExtensionManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.0")
assert manifest.id == "test-ext"
assert manager.registry.is_installed("test-ext")
def test_install_from_tar_gz_nested(self, extension_dir, project_dir, temp_dir):
"""install_from_zip should handle a single nested directory inside the tarball."""
archive = temp_dir / "test-ext-nested.tar.gz"
self._make_tarball(archive, extension_dir, nested=True)
manager = ExtensionManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.0")
assert manifest.id == "test-ext"
assert manager.registry.is_installed("test-ext")
def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir):
"""install_from_zip raises ValidationError when tarball has no extension.yml."""
import tarfile
import io
archive = temp_dir / "bad.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
data = b"no manifest here"
info = tarfile.TarInfo(name="readme.txt")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
manager = ExtensionManager(project_dir)
with pytest.raises(ValidationError, match="No extension.yml found"):
manager.install_from_zip(archive, "0.1.0")
def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir):
"""install_from_zip must reject tarballs with path traversal entries."""
import tarfile
import io
archive = temp_dir / "evil.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
info = tarfile.TarInfo(name="../../evil.txt")
data = b"evil"
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
manager = ExtensionManager(project_dir)
with pytest.raises(ValidationError, match="Unsafe path"):
manager.install_from_zip(archive, "0.1.0")
def test_install_from_tar_gz_rejects_symlinks(self, project_dir, temp_dir):
"""install_from_zip must reject tarballs containing symlinks."""
import tarfile
archive = temp_dir / "symlink.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
info = tarfile.TarInfo(name="link")
info.type = tarfile.SYMTYPE
info.linkname = "/etc/passwd"
tf.addfile(info)
manager = ExtensionManager(project_dir)
with pytest.raises(ValidationError, match="Symlinks"):
manager.install_from_zip(archive, "0.1.0")
# ===== CommandRegistrar Tests ===== # ===== CommandRegistrar Tests =====
class TestCommandRegistrar: class TestCommandRegistrar:
@@ -2627,6 +2759,7 @@ class TestExtensionCatalog:
mock_response = MagicMock() mock_response = MagicMock()
mock_response.read.return_value = zip_bytes mock_response.read.return_value = zip_bytes
mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-ext.zip"
mock_response.__enter__ = lambda s: s mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False) mock_response.__exit__ = MagicMock(return_value=False)
@@ -3529,6 +3662,7 @@ class TestDownloadExtensionBundled:
mock_response = MagicMock() mock_response = MagicMock()
mock_response.read.return_value = b"fake zip data" mock_response.read.return_value = b"fake zip data"
mock_response.geturl.return_value = "https://example.com/git-2.0.0.zip"
mock_response.__enter__ = lambda s: s mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False) mock_response.__exit__ = MagicMock(return_value=False)

View File

@@ -649,6 +649,90 @@ class TestPresetManager:
with pytest.raises(PresetValidationError, match="No preset.yml found"): with pytest.raises(PresetValidationError, match="No preset.yml found"):
manager.install_from_zip(zip_path, "0.1.5") manager.install_from_zip(zip_path, "0.1.5")
def _make_tarball(self, dest, pack_dir, nested=False):
import tarfile
with tarfile.open(dest, "w:gz") as tf:
for file_path in pack_dir.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(pack_dir)
if nested:
arcname = Path("test-pack-v1.0.0") / arcname
tf.add(file_path, arcname=str(arcname))
def test_install_from_tar_gz(self, project_dir, pack_dir, temp_dir):
"""Test installing a preset from a .tar.gz archive."""
archive = temp_dir / "test-pack-1.0.tar.gz"
self._make_tarball(archive, pack_dir)
manager = PresetManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.5")
assert manifest.id == "test-pack"
assert manager.registry.is_installed("test-pack")
def test_install_from_tgz(self, project_dir, pack_dir, temp_dir):
"""Test installing a preset from a .tgz archive."""
archive = temp_dir / "test-pack-1.0.tgz"
self._make_tarball(archive, pack_dir)
manager = PresetManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.5")
assert manifest.id == "test-pack"
assert manager.registry.is_installed("test-pack")
def test_install_from_tar_gz_nested(self, project_dir, pack_dir, temp_dir):
"""Test installing a preset from a .tar.gz archive with a single nested directory."""
archive = temp_dir / "test-pack-nested.tar.gz"
self._make_tarball(archive, pack_dir, nested=True)
manager = PresetManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.5")
assert manifest.id == "test-pack"
assert manager.registry.is_installed("test-pack")
def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir):
"""Test installing a preset from a .tar.gz without preset.yml raises error."""
import tarfile
import io
archive = temp_dir / "bad.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
data = b"no manifest here"
info = tarfile.TarInfo(name="readme.txt")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
manager = PresetManager(project_dir)
with pytest.raises(PresetValidationError, match="No preset.yml found"):
manager.install_from_zip(archive, "0.1.5")
def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir):
"""install_from_zip must reject tarballs with path traversal entries."""
import tarfile
import io
archive = temp_dir / "evil.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
info = tarfile.TarInfo(name="../../evil.txt")
data = b"evil"
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
manager = PresetManager(project_dir)
with pytest.raises(PresetValidationError, match="Unsafe path"):
manager.install_from_zip(archive, "0.1.5")
def test_install_from_tar_gz_rejects_symlinks(self, project_dir, temp_dir):
"""install_from_zip must reject tarballs containing symlinks."""
import tarfile
archive = temp_dir / "symlink.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
info = tarfile.TarInfo(name="link")
info.type = tarfile.SYMTYPE
info.linkname = "/etc/passwd"
tf.addfile(info)
manager = PresetManager(project_dir)
with pytest.raises(PresetValidationError, match="Symlinks"):
manager.install_from_zip(archive, "0.1.5")
def test_remove(self, project_dir, pack_dir): def test_remove(self, project_dir, pack_dir):
"""Test removing a preset.""" """Test removing a preset."""
manager = PresetManager(project_dir) manager = PresetManager(project_dir)
@@ -1529,6 +1613,7 @@ class TestPresetCatalog:
mock_response = MagicMock() mock_response = MagicMock()
mock_response.read.return_value = zip_bytes mock_response.read.return_value = zip_bytes
mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-pack.zip"
mock_response.__enter__ = lambda s: s mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False) mock_response.__exit__ = MagicMock(return_value=False)

View File

@@ -1843,3 +1843,230 @@ steps:
assert state.status == RunStatus.COMPLETED assert state.status == RunStatus.COMPLETED
assert "do-plan" in state.step_results assert "do-plan" in state.step_results
assert "do-specify" not in state.step_results assert "do-specify" not in state.step_results
# ===== workflow add archive CLI tests =====
MINIMAL_WORKFLOW_YAML = """\
schema_version: "1.0"
workflow:
id: "arc-workflow"
name: "Archive Workflow"
version: "1.0.0"
description: "Installed from archive"
steps:
- id: step-one
type: shell
run: "echo hello"
"""
class TestWorkflowAddArchive:
"""CLI-level tests for `workflow add` with local archive files."""
@pytest.fixture
def project_dir(self, tmp_path):
"""Create a minimal spec-kit project."""
specify = tmp_path / ".specify"
specify.mkdir()
(specify / "workflows").mkdir()
return tmp_path
def _runner_and_app(self):
from typer.testing import CliRunner
from specify_cli import app
return CliRunner(), app
# -- Local ZIP archive --------------------------------------------------
def test_workflow_add_local_zip_flat(self, project_dir):
"""workflow add installs from a local ZIP with workflow.yml at root."""
import zipfile
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "workflow.zip"
with zipfile.ZipFile(archive, "w") as zf:
zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML)
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
installed = project_dir / ".specify" / "workflows" / "arc-workflow" / "workflow.yml"
assert installed.exists()
def test_workflow_add_local_zip_nested(self, project_dir):
"""workflow add installs from a local ZIP with workflow.yml in a subdirectory."""
import zipfile
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "workflow.zip"
with zipfile.ZipFile(archive, "w") as zf:
zf.writestr("repo-1.0/workflow.yml", MINIMAL_WORKFLOW_YAML)
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
def test_workflow_add_local_zip_missing_workflow_yml(self, project_dir):
"""workflow add exits with an error when the ZIP has no workflow.yml."""
import zipfile
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "empty.zip"
with zipfile.ZipFile(archive, "w") as zf:
zf.writestr("README.md", "nothing here")
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True)
assert result.exit_code != 0
assert "extract" in result.output.lower() or "workflow" in result.output.lower()
# -- Local tar.gz archive -----------------------------------------------
def test_workflow_add_local_tar_gz_flat(self, project_dir):
"""workflow add installs from a local .tar.gz with workflow.yml at root."""
import tarfile, io
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "workflow.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
data = MINIMAL_WORKFLOW_YAML.encode()
info = tarfile.TarInfo(name="workflow.yml")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
installed = project_dir / ".specify" / "workflows" / "arc-workflow" / "workflow.yml"
assert installed.exists()
def test_workflow_add_local_tar_gz_nested(self, project_dir):
"""workflow add installs from a local .tar.gz with workflow.yml in a subdirectory."""
import tarfile, io
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "workflow.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
data = MINIMAL_WORKFLOW_YAML.encode()
info = tarfile.TarInfo(name="repo-1.0/workflow.yml")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
def test_workflow_add_local_tgz_flat(self, project_dir):
"""workflow add recognises the .tgz extension as a gzipped tarball."""
import tarfile, io
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "workflow.tgz"
with tarfile.open(archive, "w:gz") as tf:
data = MINIMAL_WORKFLOW_YAML.encode()
info = tarfile.TarInfo(name="workflow.yml")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
def test_workflow_add_local_tar_gz_missing_workflow_yml(self, project_dir):
"""workflow add exits with an error when the .tar.gz has no workflow.yml."""
import tarfile, io
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "empty.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
data = b"nothing"
info = tarfile.TarInfo(name="README.md")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True)
assert result.exit_code != 0
assert "extract" in result.output.lower() or "workflow" in result.output.lower()
# -- URL archive download -----------------------------------------------
def test_workflow_add_url_tar_gz(self, project_dir):
"""workflow add downloads a .tar.gz from a URL and installs the workflow."""
import tarfile, io
from unittest.mock import patch, MagicMock
runner, app = self._runner_and_app()
# Build an in-memory tar.gz archive containing workflow.yml.
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
data = MINIMAL_WORKFLOW_YAML.encode()
info = tarfile.TarInfo(name="workflow.yml")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
raw_bytes = buf.getvalue()
mock_resp = MagicMock()
mock_resp.geturl.return_value = "https://example.com/workflow.tar.gz"
mock_resp.headers.get.return_value = "application/gzip"
mock_resp.read.return_value = raw_bytes
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("urllib.request.urlopen", return_value=mock_resp), \
patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(
app, ["workflow", "add", "https://example.com/workflow.tar.gz"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
def test_workflow_add_url_zip(self, project_dir):
"""workflow add downloads a .zip from a URL and installs the workflow."""
import zipfile, io
from unittest.mock import patch, MagicMock
runner, app = self._runner_and_app()
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML)
raw_bytes = buf.getvalue()
mock_resp = MagicMock()
mock_resp.geturl.return_value = "https://example.com/workflow.zip"
mock_resp.headers.get.return_value = "application/zip"
mock_resp.read.return_value = raw_bytes
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("urllib.request.urlopen", return_value=mock_resp), \
patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(
app, ["workflow", "add", "https://example.com/workflow.zip"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output