Compare commits

..

1 Commits

Author SHA1 Message Date
github-actions[bot]
48a545f214 chore: bump version to 0.8.2 2026-04-28 18:45:43 +00:00
8 changed files with 123 additions and 1000 deletions

View File

@@ -2,6 +2,28 @@
<!-- insert new changelog below this comment -->
## [0.8.2] - 2026-04-28
### Changed
- Potential fix for pull request finding
- Potential fix for pull request finding
- Potential fix for pull request finding
- Potential fix for pull request finding
- Potential fix for pull request finding
- Potential fix for pull request finding
- Potential fix for pull request finding
- Add MarkItDown Document Converter extension to community catalog (#2390)
- feat: Speckit preset fiction book v1.7 - Support for RAG (Chroma DB) offline semantic search (#2367)
- fix(extensions): use explicit UTF-8 encoding when reading manifest YAML (#2370)
- catalog: add m365 community extension
- docs: replace deprecated --ai flag with --integration in all documentation (#2359)
- feat(extensions,presets): authenticate GitHub-hosted catalog and download requests with GITHUB_TOKEN/GH_TOKEN (#2331)
- Update extensify to v1.1.0 in community catalog (#2337)
- feat(init): deprecate --no-git flag, gate deprecations at v0.10.0 (#2357)
- Add Spec Orchestrator extension to community catalog (#2350)
- chore: release 0.8.1, begin 0.8.2.dev0 development (#2356)
## [0.8.1] - 2026-04-24
### Changed

View File

@@ -1,6 +1,6 @@
[project]
name = "specify-cli"
version = "0.8.2.dev0"
version = "0.8.2"
description = "Specify CLI, part of GitHub Spec Kit. A tool to bootstrap your projects for Spec-Driven Development (SDD)."
requires-python = ">=3.11"
dependencies = [

View File

@@ -2576,7 +2576,7 @@ def preset_list():
@preset_app.command("add")
def preset_add(
preset_id: str = typer.Argument(None, help="Preset ID to install from catalog"),
from_url: str = typer.Option(None, "--from", help="Install from a URL (ZIP or .tar.gz/.tgz archive)"),
from_url: str = typer.Option(None, "--from", help="Install from a URL (ZIP file)"),
dev: str = typer.Option(None, "--dev", help="Install from local directory (development mode)"),
priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"),
):
@@ -2629,46 +2629,17 @@ def preset_add(
import urllib.request
import urllib.error
import tempfile
from .extensions import detect_archive_format as _det_fmt
with tempfile.TemporaryDirectory() as tmpdir:
final_url = from_url
archive_fmt = ""
zip_path = Path(tmpdir) / "preset.zip"
try:
with urllib.request.urlopen(from_url, timeout=60) as response:
final_url = response.geturl()
# Re-validate scheme after any redirect (scheme-downgrade
# guard). Check BEFORE reading the body so an insecure
# redirect cannot cause us to fetch the payload.
_fp = _urlparse(final_url)
_fl = _fp.hostname in ("localhost", "127.0.0.1", "::1")
if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl):
console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}")
raise typer.Exit(1)
content_type = response.headers.get("Content-Type", "")
# Prefer the post-redirect URL for format detection;
# fall back to the original URL only as a last hint.
archive_fmt = _det_fmt(final_url, content_type)
if not archive_fmt:
archive_fmt = _det_fmt(from_url)
archive_data = response.read()
zip_path.write_bytes(response.read())
except urllib.error.URLError as e:
console.print(f"[red]Error:[/red] Failed to download: {e}")
raise typer.Exit(1)
if not archive_fmt:
console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.")
console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.")
raise typer.Exit(1)
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
archive_path = Path(tmpdir) / f"preset{suffix}"
try:
archive_path.write_bytes(archive_data)
manifest = manager.install_from_zip(archive_path, speckit_version, priority)
except OSError as e:
console.print(f"[red]Error:[/red] Failed to save or install archive: {e}")
raise typer.Exit(1)
manifest = manager.install_from_zip(zip_path, speckit_version, priority)
console.print(f"[green]✓[/green] Preset '{manifest.name}' v{manifest.version} installed (priority {priority})")
@@ -3602,7 +3573,7 @@ def catalog_remove(
def extension_add(
extension: str = typer.Argument(help="Extension name or path"),
dev: bool = typer.Option(False, "--dev", help="Install from local directory"),
from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL (ZIP or .tar.gz/.tgz archive)"),
from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL"),
priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"),
):
"""Install an extension."""
@@ -3641,11 +3612,10 @@ def extension_add(
manifest = manager.install_from_directory(source_path, speckit_version, priority=priority)
elif from_url:
# Install from URL (ZIP or tar.gz archive)
# Install from URL (ZIP file)
import urllib.request
import urllib.error
from urllib.parse import urlparse
from .extensions import detect_archive_format
# Validate URL
parsed = urlparse(from_url)
@@ -3661,53 +3631,25 @@ def extension_add(
console.print("Only install extensions from sources you trust.\n")
console.print(f"Downloading from {from_url}...")
# Download archive to temp location; detect format from the
# post-redirect URL (with Content-Type fallback), only using
# the original URL as a last hint.
# Download ZIP to temp location
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
download_dir.mkdir(parents=True, exist_ok=True)
archive_fmt = ""
archive_path = None
zip_path = download_dir / f"{extension}-url-download.zip"
try:
with urllib.request.urlopen(from_url, timeout=60) as response:
final_url = response.geturl()
# Re-validate scheme after any redirect (scheme-downgrade
# guard). Check BEFORE reading the body so an insecure
# redirect cannot cause us to fetch the payload.
_fp = urlparse(final_url)
_fl = _fp.hostname in ("localhost", "127.0.0.1", "::1")
if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl):
console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}")
raise typer.Exit(1)
content_type = response.headers.get("Content-Type", "")
archive_fmt = detect_archive_format(final_url, content_type)
if not archive_fmt:
archive_fmt = detect_archive_format(from_url)
archive_data = response.read()
zip_data = response.read()
zip_path.write_bytes(zip_data)
if not archive_fmt:
console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.")
console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.")
raise typer.Exit(1)
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
safe_name = Path(extension).name or "extension"
archive_path = download_dir / f"{safe_name}-url-download{suffix}"
archive_path.write_bytes(archive_data)
# Install from downloaded archive
manifest = manager.install_from_zip(archive_path, speckit_version, priority=priority)
# Install from downloaded ZIP
manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority)
except urllib.error.URLError as e:
console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}")
raise typer.Exit(1)
except OSError as e:
console.print(f"[red]Error:[/red] Failed to save or install archive: {e}")
raise typer.Exit(1)
finally:
# Clean up the downloaded archive
if archive_path is not None and archive_path.exists():
archive_path.unlink()
# Clean up downloaded ZIP
if zip_path.exists():
zip_path.unlink()
else:
# Try bundled extensions first (shipped with spec-kit)
@@ -4359,55 +4301,29 @@ def extension_update(
backup_hooks[hook_name] = ext_hooks
# 5. Download new version
archive_path = catalog.download_extension(extension_id)
zip_path = catalog.download_extension(extension_id)
try:
# 6. Validate extension ID from archive BEFORE modifying installation
# Handle both root-level and nested extension.yml (GitHub auto-generated archives)
from .extensions import detect_archive_format
import tarfile
archive_fmt = detect_archive_format(str(archive_path))
import yaml
manifest_data = None
# 6. Validate extension ID from ZIP BEFORE modifying installation
# Handle both root-level and nested extension.yml (GitHub auto-generated ZIPs)
with zipfile.ZipFile(zip_path, "r") as zf:
import yaml
manifest_data = None
namelist = zf.namelist()
if archive_fmt == "tar.gz":
with tarfile.open(archive_path, "r:gz") as tf:
# First try root-level extension.yml
try:
m = tf.getmember("extension.yml")
f = tf.extractfile(m)
if f is not None:
with f:
manifest_data = yaml.safe_load(f.read()) or {}
except KeyError:
# extension.yml not present at archive root; use nested fallback below.
manifest_data = None
# Fall back to nested-directory search if root-level
# was missing (KeyError) or not a regular file (None).
if manifest_data is None:
members = [m for m in tf.getmembers() if m.name.endswith("/extension.yml") and m.name.count("/") == 1]
if len(members) == 1:
f = tf.extractfile(members[0])
if f is not None:
with f:
manifest_data = yaml.safe_load(f.read()) or {}
else:
with zipfile.ZipFile(archive_path, "r") as zf:
namelist = zf.namelist()
# First try root-level extension.yml
if "extension.yml" in namelist:
with zf.open("extension.yml") as f:
# First try root-level extension.yml
if "extension.yml" in namelist:
with zf.open("extension.yml") as f:
manifest_data = yaml.safe_load(f) or {}
else:
# Look for extension.yml in a single top-level subdirectory
# (e.g., "repo-name-branch/extension.yml")
manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1]
if len(manifest_paths) == 1:
with zf.open(manifest_paths[0]) as f:
manifest_data = yaml.safe_load(f) or {}
else:
# Look for extension.yml in a single top-level subdirectory
# (e.g., "repo-name-branch/extension.yml")
manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1]
if len(manifest_paths) == 1:
with zf.open(manifest_paths[0]) as f:
manifest_data = yaml.safe_load(f) or {}
if manifest_data is None:
raise ValueError("Downloaded extension archive is missing 'extension.yml'")
if manifest_data is None:
raise ValueError("Downloaded extension archive is missing 'extension.yml'")
zip_extension_id = manifest_data.get("extension", {}).get("id")
if zip_extension_id != extension_id:
@@ -4419,7 +4335,7 @@ def extension_update(
manager.remove(extension_id, keep_config=True)
# 8. Install new version
_ = manager.install_from_zip(archive_path, speckit_version)
_ = manager.install_from_zip(zip_path, speckit_version)
# Restore user config files from backup after successful install.
new_extension_dir = manager.extensions_dir / extension_id
@@ -4465,9 +4381,9 @@ def extension_update(
hook["enabled"] = False
hook_executor.save_project_config(config)
finally:
# Clean up downloaded archive
if archive_path.exists():
archive_path.unlink()
# Clean up downloaded ZIP
if zip_path.exists():
zip_path.unlink()
# 10. Clean up backup on success
if backup_base.exists():
@@ -4959,59 +4875,6 @@ def workflow_list():
console.print()
def _extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes:
"""Extract ``workflow.yml`` from a ZIP or ``.tar.gz`` archive.
Searches the archive root and a single nested top-level subdirectory
(e.g., ``repo-name-1.0/workflow.yml``).
Args:
archive_path: Path to the downloaded archive.
archive_fmt: ``"zip"`` or ``"tar.gz"``.
Returns:
Raw bytes of the ``workflow.yml`` file.
Raises:
ValueError: If no ``workflow.yml`` is found in the archive.
"""
import tarfile
if archive_fmt == "tar.gz":
with tarfile.open(archive_path, "r:gz") as tf:
# Try root-level first.
try:
f = tf.extractfile(tf.getmember("workflow.yml"))
if f is not None:
with f:
return f.read()
except KeyError:
pass # Root-level workflow.yml not found; fall through to subdirectory search below.
# Look in a single top-level subdirectory.
candidates = [
m for m in tf.getmembers()
if m.name.endswith("/workflow.yml") and m.name.count("/") == 1
]
if len(candidates) == 1:
f = tf.extractfile(candidates[0])
if f is not None:
with f:
return f.read()
else:
with zipfile.ZipFile(archive_path, "r") as zf:
namelist = zf.namelist()
if "workflow.yml" in namelist:
return zf.read("workflow.yml")
candidates = [
n for n in namelist
if n.endswith("/workflow.yml") and n.count("/") == 1
]
if len(candidates) == 1:
return zf.read(candidates[0])
raise ValueError("No workflow.yml found in the downloaded archive")
@workflow_app.command("add")
def workflow_add(
source: str = typer.Argument(..., help="Workflow ID, URL, or local path"),
@@ -5065,7 +4928,6 @@ def workflow_add(
from ipaddress import ip_address
from urllib.parse import urlparse
from urllib.request import urlopen # noqa: S310
from .extensions import detect_archive_format
parsed_src = urlparse(source)
src_host = parsed_src.hostname or ""
@@ -5096,53 +4958,18 @@ def workflow_add(
if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb):
console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}")
raise typer.Exit(1)
# Detect archive format from the final URL or Content-Type header.
archive_fmt = detect_archive_format(final_url)
if not archive_fmt:
content_type = resp.headers.get("Content-Type", "")
archive_fmt = detect_archive_format(final_url, content_type)
raw_data = resp.read()
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
tmp.write(resp.read())
tmp_path = Path(tmp.name)
except typer.Exit:
raise
except Exception as exc:
console.print(f"[red]Error:[/red] Failed to download workflow: {exc}")
raise typer.Exit(1)
tmp_path = None
try:
if archive_fmt in ("tar.gz", "zip"):
# Extract workflow.yml from the archive.
suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip"
arc_tmp_path = None
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_tmp:
arc_tmp_path = Path(arc_tmp.name)
arc_tmp.write(raw_data)
try:
wf_yaml = _extract_workflow_yml(arc_tmp_path, archive_fmt)
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
tmp_path = Path(tmp.name)
tmp.write(wf_yaml)
finally:
if arc_tmp_path is not None:
arc_tmp_path.unlink(missing_ok=True)
else:
# Treat as a plain YAML file (existing behaviour).
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
tmp.write(raw_data)
tmp_path = Path(tmp.name)
except typer.Exit:
raise
except Exception as exc:
console.print(f"[red]Error:[/red] Failed to process downloaded workflow: {exc}")
raise typer.Exit(1)
try:
_validate_and_install_local(tmp_path, source)
finally:
if tmp_path is not None:
tmp_path.unlink(missing_ok=True)
tmp_path.unlink(missing_ok=True)
return
# Try as a local file/directory
@@ -5151,27 +4978,6 @@ def workflow_add(
if source_path.is_file() and source_path.suffix in (".yml", ".yaml"):
_validate_and_install_local(source_path, str(source_path))
return
elif source_path.is_file() and (
source.lower().endswith(".tar.gz") or source.lower().endswith(".tgz") or source.lower().endswith(".zip")
):
# Local archive file containing workflow.yml
from .extensions import detect_archive_format
local_fmt = detect_archive_format(source)
try:
wf_yaml = _extract_workflow_yml(source_path, local_fmt)
except Exception as exc:
console.print(f"[red]Error:[/red] Failed to extract workflow from archive: {exc}")
raise typer.Exit(1)
import tempfile
tmp_local = None
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
tmp_local = Path(tmp.name)
tmp.write(wf_yaml)
try:
_validate_and_install_local(tmp_local, str(source_path))
finally:
tmp_local.unlink(missing_ok=True)
return
elif source_path.is_dir():
wf_file = source_path / "workflow.yml"
if not wf_file.exists():
@@ -5235,7 +5041,6 @@ def workflow_add(
try:
from urllib.request import urlopen # noqa: S310 — URL comes from catalog
from .extensions import detect_archive_format
workflow_dir.mkdir(parents=True, exist_ok=True)
with urlopen(workflow_url, timeout=30) as response: # noqa: S310
@@ -5258,32 +5063,7 @@ def workflow_add(
f"[red]Error:[/red] Workflow '{source}' redirected to non-HTTPS URL: {final_url}"
)
raise typer.Exit(1)
# Detect archive format from the final URL or Content-Type header.
cat_archive_fmt = detect_archive_format(final_url)
if not cat_archive_fmt:
cat_ct = response.headers.get("Content-Type", "")
cat_archive_fmt = detect_archive_format(final_url, cat_ct)
raw_response = response.read()
if cat_archive_fmt in ("tar.gz", "zip"):
# Download URL points to an archive — extract workflow.yml from it.
suffix = ".tar.gz" if cat_archive_fmt == "tar.gz" else ".zip"
arc_tmp = None
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_f:
arc_tmp = Path(arc_f.name)
arc_f.write(raw_response)
try:
wf_yaml_bytes = _extract_workflow_yml(arc_tmp, cat_archive_fmt)
finally:
if arc_tmp is not None:
arc_tmp.unlink(missing_ok=True)
workflow_file.write_bytes(wf_yaml_bytes)
else:
workflow_file.write_bytes(raw_response)
except typer.Exit:
raise
workflow_file.write_bytes(response.read())
except Exception as exc:
if workflow_dir.exists():
import shutil

View File

@@ -9,8 +9,6 @@ without bloating the core framework.
import json
import hashlib
import os
import sys
import tarfile
import tempfile
import zipfile
import shutil
@@ -108,137 +106,6 @@ def normalize_priority(value: Any, default: int = 10) -> int:
return priority if priority >= 1 else default
def detect_archive_format(url: str, content_type: str = "") -> str:
"""Detect archive format from URL path extension or Content-Type header.
Args:
url: URL or file path to inspect.
content_type: Optional ``Content-Type`` header value from the HTTP response.
Returns:
``"zip"`` for ZIP archives, ``"tar.gz"`` for gzipped tarballs, or ``""``
when the format cannot be determined.
"""
# Strip query-string / fragment before examining the path extension.
url_path = url.split("?")[0].split("#")[0].lower()
if url_path.endswith(".zip"):
return "zip"
if url_path.endswith(".tar.gz") or url_path.endswith(".tgz"):
return "tar.gz"
# Fall back to Content-Type header inspection.
ct = content_type.lower()
if "application/zip" in ct or "application/x-zip" in ct:
return "zip"
if any(
t in ct
for t in (
"application/gzip",
"application/x-gzip",
"application/x-tar+gzip",
)
):
return "tar.gz"
return ""
def safe_extract_tarball(
archive_path: Path,
dest_dir: Path,
error_class: "type[Exception]" = Exception,
) -> None:
"""Safely extract a ``.tar.gz`` or ``.tgz`` archive into *dest_dir*.
All members are validated before extraction to prevent *tar slip*
(path traversal) attacks. Symlinks, hard links, and special files
(devices, FIFOs, etc.) are rejected.
On Python 3.12 and later the ``"data"`` extraction filter is applied
for an additional layer of OS-level protection. On earlier versions
the explicit member list (containing only pre-validated regular files
and directories) is passed to ``extractall()`` — since all symlinks are
already rejected in the validation phase, no archive-introduced symlink
can be followed during extraction.
Args:
archive_path: Path to the ``.tar.gz``/``.tgz`` archive.
dest_dir: Destination directory (must already exist).
error_class: Exception class to raise on unsafe entries.
Raises:
error_class: If any member is unsafe or the archive cannot be read.
"""
dest_resolved = dest_dir.resolve()
# Tar metadata member types to skip during validation — they carry no
# extractable payload and are generated automatically by many common
# archiving tools (e.g. PAX headers, GNU longname/longlink entries).
# GNUTYPE_SPARSE is intentionally excluded: it carries a real file payload
# and tarfile.TarInfo.isreg() returns True for it, so it passes the
# regular-file check below and is extracted correctly.
_TAR_METADATA_TYPES = (
tarfile.XHDTYPE, # PAX extended header
tarfile.XGLTYPE, # PAX global extended header
tarfile.SOLARIS_XHDTYPE, # Solaris PAX extended header
tarfile.GNUTYPE_LONGNAME, # GNU long path name (metadata only)
tarfile.GNUTYPE_LONGLINK, # GNU long link name (metadata only)
)
try:
with tarfile.open(archive_path, "r:gz") as tf:
members = tf.getmembers()
safe_members = []
# Validate every member before extracting anything.
for member in members:
# Reject absolute paths and any path component that is "..".
if os.path.isabs(member.name) or any(
part == ".." for part in member.name.replace("\\", "/").split("/")
):
raise error_class(
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
)
# Confirm the resolved path stays inside dest_dir.
member_path = (dest_dir / member.name).resolve()
try:
member_path.relative_to(dest_resolved)
except ValueError:
raise error_class(
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
)
# Skip tar metadata members — they carry no extractable payload.
if member.type in _TAR_METADATA_TYPES:
continue
# Reject symlinks and hard links.
if member.issym() or member.islnk():
raise error_class(
f"Symlinks are not allowed in archive: {member.name}"
)
# Reject devices, FIFOs and other special file types.
if not (member.isreg() or member.isdir()):
raise error_class(
f"Non-regular file in archive: {member.name}"
)
safe_members.append(member)
# Extract — use the "data" filter on Python 3.12+ for extra hardening.
# On all versions pass only the pre-validated members so that no
# unvetted entry (added concurrently or via a race) slips through.
if sys.version_info >= (3, 12):
tf.extractall(dest_dir, members=safe_members, filter="data") # type: ignore[call-arg]
else:
tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above
except error_class:
raise
except (tarfile.TarError, OSError) as e:
raise error_class(f"Failed to read archive {archive_path}: {e}") from e
@dataclass
class CatalogEntry:
"""Represents a single catalog entry in the catalog stack."""
@@ -1335,10 +1202,10 @@ class ExtensionManager:
speckit_version: str,
priority: int = 10,
) -> ExtensionManifest:
"""Install extension from a ZIP or ``.tar.gz``/``.tgz`` archive.
"""Install extension from ZIP file.
Args:
zip_path: Path to the extension archive (ZIP or gzipped tarball).
zip_path: Path to extension ZIP file
speckit_version: Current spec-kit version
priority: Resolution priority (lower = higher precedence, default 10)
@@ -1346,8 +1213,7 @@ class ExtensionManager:
Installed extension manifest
Raises:
ValidationError: If manifest is invalid, the archive is unsafe, or
priority is invalid
ValidationError: If manifest is invalid or priority is invalid
CompatibilityError: If extension is incompatible
"""
# Validate priority early
@@ -1357,27 +1223,21 @@ class ExtensionManager:
with tempfile.TemporaryDirectory() as tmpdir:
temp_path = Path(tmpdir)
archive_fmt = detect_archive_format(str(zip_path))
if archive_fmt == "tar.gz":
# Extract tarball safely (prevent tar slip attack)
safe_extract_tarball(zip_path, temp_path, ValidationError)
else:
# Extract ZIP safely (prevent Zip Slip attack)
with zipfile.ZipFile(zip_path, 'r') as zf:
# Validate all paths first before extracting anything
temp_path_resolved = temp_path.resolve()
for member in zf.namelist():
member_path = (temp_path / member).resolve()
# Use is_relative_to for safe path containment check
try:
member_path.relative_to(temp_path_resolved)
except ValueError:
raise ValidationError(
f"Unsafe path in ZIP archive: {member} (potential path traversal)"
)
# Only extract after all paths are validated
zf.extractall(temp_path)
# Extract ZIP safely (prevent Zip Slip attack)
with zipfile.ZipFile(zip_path, 'r') as zf:
# Validate all paths first before extracting anything
temp_path_resolved = temp_path.resolve()
for member in zf.namelist():
member_path = (temp_path / member).resolve()
# Use is_relative_to for safe path containment check
try:
member_path.relative_to(temp_path_resolved)
except ValueError:
raise ValidationError(
f"Unsafe path in ZIP archive: {member} (potential path traversal)"
)
# Only extract after all paths are validated
zf.extractall(temp_path)
# Find extension directory (may be nested)
extension_dir = temp_path
@@ -1391,7 +1251,7 @@ class ExtensionManager:
manifest_path = extension_dir / "extension.yml"
if not manifest_path.exists():
raise ValidationError("No extension.yml found in archive")
raise ValidationError("No extension.yml found in ZIP file")
# Install from extracted directory
return self.install_from_directory(extension_dir, speckit_version, priority=priority)
@@ -2105,18 +1965,14 @@ class ExtensionCatalog:
return None
def download_extension(self, extension_id: str, target_dir: Optional[Path] = None) -> Path:
"""Download extension archive from catalog.
Supports both ZIP (``.zip``) and gzipped tarball (``.tar.gz``/``.tgz``)
archives. The format is detected from the download URL's path extension;
when ambiguous the ``Content-Type`` header is used as a fallback.
"""Download extension ZIP from catalog.
Args:
extension_id: ID of the extension to download
target_dir: Directory to save the archive (defaults to cache directory)
target_dir: Directory to save ZIP file (defaults to temp directory)
Returns:
Path to downloaded archive file
Path to downloaded ZIP file
Raises:
ExtensionError: If extension not found or download fails
@@ -2155,60 +2011,21 @@ class ExtensionCatalog:
target_dir.mkdir(parents=True, exist_ok=True)
version = ext_info.get("version", "unknown")
zip_filename = f"{extension_id}-{version}.zip"
zip_path = target_dir / zip_filename
# Download the archive. Determine the archive format from the
# post-redirect URL first (with Content-Type fallback); only use the
# original `download_url` as a last hint if the final URL gives no
# signal.
final_url = download_url
archive_fmt = ""
# Download the ZIP file
try:
with self._open_url(download_url, timeout=60) as response:
final_url = response.geturl()
# Re-validate scheme after any redirect to guard against
# scheme-downgrade. Validate BEFORE reading the body so a
# malicious redirect cannot cause us to fetch the payload
# over an insecure scheme.
_final_parsed = urlparse(final_url)
_final_is_localhost = _final_parsed.hostname in (
"localhost",
"127.0.0.1",
"::1",
)
if _final_parsed.scheme != "https" and not (
_final_parsed.scheme == "http" and _final_is_localhost
):
raise ExtensionError(
f"Extension download URL was redirected to a non-HTTPS URL: {final_url}"
)
content_type = response.headers.get("Content-Type", "")
archive_fmt = detect_archive_format(final_url, content_type)
if not archive_fmt:
archive_fmt = detect_archive_format(download_url)
archive_data = response.read()
zip_data = response.read()
zip_path.write_bytes(zip_data)
return zip_path
except urllib.error.URLError as e:
raise ExtensionError(f"Failed to download extension from {download_url}: {e}")
except IOError as e:
raise ExtensionError(f"Failed to read extension archive from {download_url}: {e}")
# Choose file extension based on detected format.
if not archive_fmt:
raise ExtensionError(
f"Could not determine archive format for {download_url}. "
"Ensure the URL points to a .zip or .tar.gz/.tgz file."
)
if archive_fmt == "tar.gz":
archive_filename = f"{extension_id}-{version}.tar.gz"
else:
archive_filename = f"{extension_id}-{version}.zip"
archive_path = target_dir / archive_filename
try:
archive_path.write_bytes(archive_data)
except IOError as e:
raise ExtensionError(f"Failed to save extension archive: {e}")
return archive_path
raise ExtensionError(f"Failed to save extension ZIP: {e}")
def clear_cache(self):
"""Clear the catalog cache (both legacy and URL-hash-based files)."""

View File

@@ -27,7 +27,7 @@ import yaml
from packaging import version as pkg_version
from packaging.specifiers import SpecifierSet, InvalidSpecifier
from .extensions import ExtensionRegistry, normalize_priority, detect_archive_format, safe_extract_tarball
from .extensions import ExtensionRegistry, normalize_priority
def _substitute_core_template(
@@ -1604,10 +1604,10 @@ class PresetManager:
speckit_version: str,
priority: int = 10,
) -> PresetManifest:
"""Install preset from a ZIP or ``.tar.gz``/``.tgz`` archive.
"""Install preset from ZIP file.
Args:
zip_path: Path to the preset archive (ZIP or gzipped tarball).
zip_path: Path to preset ZIP file
speckit_version: Current spec-kit version
priority: Resolution priority (lower = higher precedence, default 10)
@@ -1615,8 +1615,7 @@ class PresetManager:
Installed preset manifest
Raises:
PresetValidationError: If manifest is invalid, the archive is unsafe,
or priority is invalid
PresetValidationError: If manifest is invalid or priority is invalid
PresetCompatibilityError: If pack is incompatible
"""
# Validate priority early
@@ -1626,24 +1625,18 @@ class PresetManager:
with tempfile.TemporaryDirectory() as tmpdir:
temp_path = Path(tmpdir)
archive_fmt = detect_archive_format(str(zip_path))
if archive_fmt == "tar.gz":
# Extract tarball safely (prevent tar slip attack)
safe_extract_tarball(zip_path, temp_path, PresetValidationError)
else:
with zipfile.ZipFile(zip_path, 'r') as zf:
temp_path_resolved = temp_path.resolve()
for member in zf.namelist():
member_path = (temp_path / member).resolve()
try:
member_path.relative_to(temp_path_resolved)
except ValueError:
raise PresetValidationError(
f"Unsafe path in ZIP archive: {member} "
"(potential path traversal)"
)
zf.extractall(temp_path)
with zipfile.ZipFile(zip_path, 'r') as zf:
temp_path_resolved = temp_path.resolve()
for member in zf.namelist():
member_path = (temp_path / member).resolve()
try:
member_path.relative_to(temp_path_resolved)
except ValueError:
raise PresetValidationError(
f"Unsafe path in ZIP archive: {member} "
"(potential path traversal)"
)
zf.extractall(temp_path)
pack_dir = temp_path
manifest_path = pack_dir / "preset.yml"
@@ -1656,7 +1649,7 @@ class PresetManager:
if not manifest_path.exists():
raise PresetValidationError(
"No preset.yml found in archive"
"No preset.yml found in ZIP file"
)
return self.install_from_directory(pack_dir, speckit_version, priority)
@@ -2249,18 +2242,14 @@ class PresetCatalog:
def download_pack(
self, pack_id: str, target_dir: Optional[Path] = None
) -> Path:
"""Download preset archive from catalog.
Supports both ZIP (``.zip``) and gzipped tarball (``.tar.gz``/``.tgz``)
archives. The format is detected from the download URL's path extension;
when ambiguous the ``Content-Type`` header is used as a fallback.
"""Download preset ZIP from catalog.
Args:
pack_id: ID of the preset to download
target_dir: Directory to save the archive (defaults to cache directory)
target_dir: Directory to save ZIP file (defaults to cache directory)
Returns:
Path to downloaded archive file
Path to downloaded ZIP file
Raises:
PresetError: If pack not found or download fails
@@ -2312,61 +2301,22 @@ class PresetCatalog:
target_dir.mkdir(parents=True, exist_ok=True)
version = pack_info.get("version", "unknown")
zip_filename = f"{pack_id}-{version}.zip"
zip_path = target_dir / zip_filename
# Determine the archive format from the post-redirect URL first
# (with Content-Type fallback); only use the original `download_url`
# as a last hint if the final URL gives no signal.
final_url = download_url
archive_fmt = ""
try:
with self._open_url(download_url, timeout=60) as response:
final_url = response.geturl()
# Re-validate scheme after any redirect to guard against
# scheme-downgrade. Validate BEFORE reading the body so a
# malicious redirect cannot cause us to fetch the payload
# over an insecure scheme.
_final_parsed = urlparse(final_url)
_final_is_localhost = _final_parsed.hostname in (
"localhost",
"127.0.0.1",
"::1",
)
if _final_parsed.scheme != "https" and not (
_final_parsed.scheme == "http" and _final_is_localhost
):
raise PresetError(
f"Preset download URL was redirected to a non-HTTPS URL: {final_url}"
)
content_type = response.headers.get("Content-Type", "")
archive_fmt = detect_archive_format(final_url, content_type)
if not archive_fmt:
archive_fmt = detect_archive_format(download_url)
archive_data = response.read()
zip_data = response.read()
zip_path.write_bytes(zip_data)
return zip_path
except urllib.error.URLError as e:
raise PresetError(
f"Failed to download preset from {download_url}: {e}"
)
except IOError as e:
raise PresetError(f"Failed to read preset archive from {download_url}: {e}")
# Choose file extension based on detected format.
if not archive_fmt:
raise PresetError(
f"Could not determine archive format for {download_url}. "
"Ensure the URL points to a .zip or .tar.gz/.tgz file."
)
if archive_fmt == "tar.gz":
archive_filename = f"{pack_id}-{version}.tar.gz"
else:
archive_filename = f"{pack_id}-{version}.zip"
archive_path = target_dir / archive_filename
try:
archive_path.write_bytes(archive_data)
except IOError as e:
raise PresetError(f"Failed to save preset archive: {e}")
return archive_path
raise PresetError(f"Failed to save preset ZIP: {e}")
def clear_cache(self):
"""Clear all catalog cache files, including per-URL hashed caches."""

View File

@@ -178,47 +178,6 @@ class TestNormalizePriority:
assert normalize_priority("invalid", default=1) == 1
# ===== detect_archive_format Tests =====
class TestDetectArchiveFormat:
"""Test the detect_archive_format helper."""
def _fmt(self, url, ct=""):
from specify_cli.extensions import detect_archive_format
return detect_archive_format(url, ct)
def test_zip_url_extension(self):
assert self._fmt("https://example.com/ext-1.0.0.zip") == "zip"
def test_tar_gz_url_extension(self):
assert self._fmt("https://example.com/ext-1.0.0.tar.gz") == "tar.gz"
def test_tgz_url_extension(self):
assert self._fmt("https://example.com/ext-1.0.0.tgz") == "tar.gz"
def test_zip_uppercase_url_extension(self):
assert self._fmt("https://example.com/ext.ZIP") == "zip"
def test_tar_gz_with_query_string(self):
assert self._fmt("https://example.com/ext.tar.gz?token=abc") == "tar.gz"
def test_zip_content_type_fallback(self):
assert self._fmt("https://example.com/download", "application/zip") == "zip"
def test_gzip_content_type_fallback(self):
assert self._fmt("https://example.com/download", "application/gzip") == "tar.gz"
def test_x_gzip_content_type_fallback(self):
assert self._fmt("https://example.com/download", "application/x-gzip") == "tar.gz"
def test_unknown_returns_empty_string(self):
assert self._fmt("https://example.com/workflow.yml") == ""
def test_url_extension_takes_precedence_over_content_type(self):
# URL says .zip — content-type claiming gzip should not override.
assert self._fmt("https://example.com/ext.zip", "application/gzip") == "zip"
# ===== ExtensionManifest Tests =====
class TestExtensionManifest:
@@ -1054,97 +1013,6 @@ class TestExtensionManager:
assert backup_file.read_text() == "test: config"
# ===== install_from_zip Tarball Tests =====
class TestInstallFromTarball:
"""Tests for install_from_zip accepting .tar.gz/.tgz archives."""
def _make_tarball(self, dest: Path, extension_dir: Path, nested: bool = False) -> None:
"""Create a minimal .tar.gz archive from *extension_dir*."""
import tarfile
with tarfile.open(dest, "w:gz") as tf:
for file_path in extension_dir.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(extension_dir)
if nested:
arcname = Path("test-ext-v1.0.0") / arcname
tf.add(file_path, arcname=str(arcname))
def test_install_from_tar_gz(self, extension_dir, project_dir, temp_dir):
"""install_from_zip should accept a .tar.gz archive."""
archive = temp_dir / "test-ext-1.0.0.tar.gz"
self._make_tarball(archive, extension_dir)
manager = ExtensionManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.0")
assert manifest.id == "test-ext"
assert manager.registry.is_installed("test-ext")
def test_install_from_tgz(self, extension_dir, project_dir, temp_dir):
"""install_from_zip should accept a .tgz archive."""
archive = temp_dir / "test-ext-1.0.0.tgz"
self._make_tarball(archive, extension_dir)
manager = ExtensionManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.0")
assert manifest.id == "test-ext"
assert manager.registry.is_installed("test-ext")
def test_install_from_tar_gz_nested(self, extension_dir, project_dir, temp_dir):
"""install_from_zip should handle a single nested directory inside the tarball."""
archive = temp_dir / "test-ext-nested.tar.gz"
self._make_tarball(archive, extension_dir, nested=True)
manager = ExtensionManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.0")
assert manifest.id == "test-ext"
assert manager.registry.is_installed("test-ext")
def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir):
"""install_from_zip raises ValidationError when tarball has no extension.yml."""
import tarfile
import io
archive = temp_dir / "bad.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
data = b"no manifest here"
info = tarfile.TarInfo(name="readme.txt")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
manager = ExtensionManager(project_dir)
with pytest.raises(ValidationError, match="No extension.yml found"):
manager.install_from_zip(archive, "0.1.0")
def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir):
"""install_from_zip must reject tarballs with path traversal entries."""
import tarfile
import io
archive = temp_dir / "evil.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
info = tarfile.TarInfo(name="../../evil.txt")
data = b"evil"
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
manager = ExtensionManager(project_dir)
with pytest.raises(ValidationError, match="Unsafe path"):
manager.install_from_zip(archive, "0.1.0")
def test_install_from_tar_gz_rejects_symlinks(self, project_dir, temp_dir):
"""install_from_zip must reject tarballs containing symlinks."""
import tarfile
archive = temp_dir / "symlink.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
info = tarfile.TarInfo(name="link")
info.type = tarfile.SYMTYPE
info.linkname = "/etc/passwd"
tf.addfile(info)
manager = ExtensionManager(project_dir)
with pytest.raises(ValidationError, match="Symlinks"):
manager.install_from_zip(archive, "0.1.0")
# ===== CommandRegistrar Tests =====
class TestCommandRegistrar:
@@ -2759,7 +2627,6 @@ class TestExtensionCatalog:
mock_response = MagicMock()
mock_response.read.return_value = zip_bytes
mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-ext.zip"
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)
@@ -3662,7 +3529,6 @@ class TestDownloadExtensionBundled:
mock_response = MagicMock()
mock_response.read.return_value = b"fake zip data"
mock_response.geturl.return_value = "https://example.com/git-2.0.0.zip"
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)

View File

@@ -649,90 +649,6 @@ class TestPresetManager:
with pytest.raises(PresetValidationError, match="No preset.yml found"):
manager.install_from_zip(zip_path, "0.1.5")
def _make_tarball(self, dest, pack_dir, nested=False):
import tarfile
with tarfile.open(dest, "w:gz") as tf:
for file_path in pack_dir.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(pack_dir)
if nested:
arcname = Path("test-pack-v1.0.0") / arcname
tf.add(file_path, arcname=str(arcname))
def test_install_from_tar_gz(self, project_dir, pack_dir, temp_dir):
"""Test installing a preset from a .tar.gz archive."""
archive = temp_dir / "test-pack-1.0.tar.gz"
self._make_tarball(archive, pack_dir)
manager = PresetManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.5")
assert manifest.id == "test-pack"
assert manager.registry.is_installed("test-pack")
def test_install_from_tgz(self, project_dir, pack_dir, temp_dir):
"""Test installing a preset from a .tgz archive."""
archive = temp_dir / "test-pack-1.0.tgz"
self._make_tarball(archive, pack_dir)
manager = PresetManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.5")
assert manifest.id == "test-pack"
assert manager.registry.is_installed("test-pack")
def test_install_from_tar_gz_nested(self, project_dir, pack_dir, temp_dir):
"""Test installing a preset from a .tar.gz archive with a single nested directory."""
archive = temp_dir / "test-pack-nested.tar.gz"
self._make_tarball(archive, pack_dir, nested=True)
manager = PresetManager(project_dir)
manifest = manager.install_from_zip(archive, "0.1.5")
assert manifest.id == "test-pack"
assert manager.registry.is_installed("test-pack")
def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir):
"""Test installing a preset from a .tar.gz without preset.yml raises error."""
import tarfile
import io
archive = temp_dir / "bad.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
data = b"no manifest here"
info = tarfile.TarInfo(name="readme.txt")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
manager = PresetManager(project_dir)
with pytest.raises(PresetValidationError, match="No preset.yml found"):
manager.install_from_zip(archive, "0.1.5")
def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir):
"""install_from_zip must reject tarballs with path traversal entries."""
import tarfile
import io
archive = temp_dir / "evil.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
info = tarfile.TarInfo(name="../../evil.txt")
data = b"evil"
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
manager = PresetManager(project_dir)
with pytest.raises(PresetValidationError, match="Unsafe path"):
manager.install_from_zip(archive, "0.1.5")
def test_install_from_tar_gz_rejects_symlinks(self, project_dir, temp_dir):
"""install_from_zip must reject tarballs containing symlinks."""
import tarfile
archive = temp_dir / "symlink.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
info = tarfile.TarInfo(name="link")
info.type = tarfile.SYMTYPE
info.linkname = "/etc/passwd"
tf.addfile(info)
manager = PresetManager(project_dir)
with pytest.raises(PresetValidationError, match="Symlinks"):
manager.install_from_zip(archive, "0.1.5")
def test_remove(self, project_dir, pack_dir):
"""Test removing a preset."""
manager = PresetManager(project_dir)
@@ -1613,7 +1529,6 @@ class TestPresetCatalog:
mock_response = MagicMock()
mock_response.read.return_value = zip_bytes
mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-pack.zip"
mock_response.__enter__ = lambda s: s
mock_response.__exit__ = MagicMock(return_value=False)

View File

@@ -1843,230 +1843,3 @@ steps:
assert state.status == RunStatus.COMPLETED
assert "do-plan" in state.step_results
assert "do-specify" not in state.step_results
# ===== workflow add archive CLI tests =====
MINIMAL_WORKFLOW_YAML = """\
schema_version: "1.0"
workflow:
id: "arc-workflow"
name: "Archive Workflow"
version: "1.0.0"
description: "Installed from archive"
steps:
- id: step-one
type: shell
run: "echo hello"
"""
class TestWorkflowAddArchive:
"""CLI-level tests for `workflow add` with local archive files."""
@pytest.fixture
def project_dir(self, tmp_path):
"""Create a minimal spec-kit project."""
specify = tmp_path / ".specify"
specify.mkdir()
(specify / "workflows").mkdir()
return tmp_path
def _runner_and_app(self):
from typer.testing import CliRunner
from specify_cli import app
return CliRunner(), app
# -- Local ZIP archive --------------------------------------------------
def test_workflow_add_local_zip_flat(self, project_dir):
"""workflow add installs from a local ZIP with workflow.yml at root."""
import zipfile
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "workflow.zip"
with zipfile.ZipFile(archive, "w") as zf:
zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML)
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
installed = project_dir / ".specify" / "workflows" / "arc-workflow" / "workflow.yml"
assert installed.exists()
def test_workflow_add_local_zip_nested(self, project_dir):
"""workflow add installs from a local ZIP with workflow.yml in a subdirectory."""
import zipfile
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "workflow.zip"
with zipfile.ZipFile(archive, "w") as zf:
zf.writestr("repo-1.0/workflow.yml", MINIMAL_WORKFLOW_YAML)
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
def test_workflow_add_local_zip_missing_workflow_yml(self, project_dir):
"""workflow add exits with an error when the ZIP has no workflow.yml."""
import zipfile
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "empty.zip"
with zipfile.ZipFile(archive, "w") as zf:
zf.writestr("README.md", "nothing here")
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True)
assert result.exit_code != 0
assert "extract" in result.output.lower() or "workflow" in result.output.lower()
# -- Local tar.gz archive -----------------------------------------------
def test_workflow_add_local_tar_gz_flat(self, project_dir):
"""workflow add installs from a local .tar.gz with workflow.yml at root."""
import tarfile, io
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "workflow.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
data = MINIMAL_WORKFLOW_YAML.encode()
info = tarfile.TarInfo(name="workflow.yml")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
installed = project_dir / ".specify" / "workflows" / "arc-workflow" / "workflow.yml"
assert installed.exists()
def test_workflow_add_local_tar_gz_nested(self, project_dir):
"""workflow add installs from a local .tar.gz with workflow.yml in a subdirectory."""
import tarfile, io
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "workflow.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
data = MINIMAL_WORKFLOW_YAML.encode()
info = tarfile.TarInfo(name="repo-1.0/workflow.yml")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
def test_workflow_add_local_tgz_flat(self, project_dir):
"""workflow add recognises the .tgz extension as a gzipped tarball."""
import tarfile, io
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "workflow.tgz"
with tarfile.open(archive, "w:gz") as tf:
data = MINIMAL_WORKFLOW_YAML.encode()
info = tarfile.TarInfo(name="workflow.yml")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
def test_workflow_add_local_tar_gz_missing_workflow_yml(self, project_dir):
"""workflow add exits with an error when the .tar.gz has no workflow.yml."""
import tarfile, io
from unittest.mock import patch
runner, app = self._runner_and_app()
archive = project_dir / "empty.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
data = b"nothing"
info = tarfile.TarInfo(name="README.md")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
with patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True)
assert result.exit_code != 0
assert "extract" in result.output.lower() or "workflow" in result.output.lower()
# -- URL archive download -----------------------------------------------
def test_workflow_add_url_tar_gz(self, project_dir):
"""workflow add downloads a .tar.gz from a URL and installs the workflow."""
import tarfile, io
from unittest.mock import patch, MagicMock
runner, app = self._runner_and_app()
# Build an in-memory tar.gz archive containing workflow.yml.
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
data = MINIMAL_WORKFLOW_YAML.encode()
info = tarfile.TarInfo(name="workflow.yml")
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
raw_bytes = buf.getvalue()
mock_resp = MagicMock()
mock_resp.geturl.return_value = "https://example.com/workflow.tar.gz"
mock_resp.headers.get.return_value = "application/gzip"
mock_resp.read.return_value = raw_bytes
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("urllib.request.urlopen", return_value=mock_resp), \
patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(
app, ["workflow", "add", "https://example.com/workflow.tar.gz"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output
def test_workflow_add_url_zip(self, project_dir):
"""workflow add downloads a .zip from a URL and installs the workflow."""
import zipfile, io
from unittest.mock import patch, MagicMock
runner, app = self._runner_and_app()
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML)
raw_bytes = buf.getvalue()
mock_resp = MagicMock()
mock_resp.geturl.return_value = "https://example.com/workflow.zip"
mock_resp.headers.get.return_value = "application/zip"
mock_resp.read.return_value = raw_bytes
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("urllib.request.urlopen", return_value=mock_resp), \
patch.object(Path, "cwd", return_value=project_dir):
result = runner.invoke(
app, ["workflow", "add", "https://example.com/workflow.zip"],
catch_exceptions=False,
)
assert result.exit_code == 0, result.output
assert "arc-workflow" in result.output