diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index b540ebd1a..54f6661a6 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4326,18 +4326,18 @@ def extension_update( backup_hooks[hook_name] = ext_hooks # 5. Download new version - zip_path = catalog.download_extension(extension_id) + archive_path = catalog.download_extension(extension_id) try: # 6. Validate extension ID from archive BEFORE modifying installation # Handle both root-level and nested extension.yml (GitHub auto-generated archives) from .extensions import _detect_archive_format import tarfile - archive_fmt = _detect_archive_format(str(zip_path)) + archive_fmt = _detect_archive_format(str(archive_path)) import yaml manifest_data = None if archive_fmt == "tar.gz": - with tarfile.open(zip_path, "r:gz") as tf: + with tarfile.open(archive_path, "r:gz") as tf: # First try root-level extension.yml try: m = tf.getmember("extension.yml") @@ -4354,7 +4354,7 @@ def extension_update( with f: manifest_data = yaml.safe_load(f.read()) or {} else: - with zipfile.ZipFile(zip_path, "r") as zf: + with zipfile.ZipFile(archive_path, "r") as zf: namelist = zf.namelist() # First try root-level extension.yml @@ -4382,7 +4382,7 @@ def extension_update( manager.remove(extension_id, keep_config=True) # 8. Install new version - _ = manager.install_from_zip(zip_path, speckit_version) + _ = manager.install_from_zip(archive_path, speckit_version) # Restore user config files from backup after successful install. new_extension_dir = manager.extensions_dir / extension_id @@ -4428,9 +4428,9 @@ def extension_update( hook["enabled"] = False hook_executor.save_project_config(config) finally: - # Clean up downloaded ZIP - if zip_path.exists(): - zip_path.unlink() + # Clean up downloaded archive + if archive_path.exists(): + archive_path.unlink() # 10. Clean up backup on success if backup_base.exists(): diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index 6946a1a09..3b25bbfbe 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -171,50 +171,55 @@ def _safe_extract_tarball( """ dest_resolved = dest_dir.resolve() - with tarfile.open(archive_path, "r:gz") as tf: - members = tf.getmembers() - safe_members = [] + try: + with tarfile.open(archive_path, "r:gz") as tf: + members = tf.getmembers() + safe_members = [] - # Validate every member before extracting anything. - for member in members: - # Reject absolute paths and any path component that is "..". - if os.path.isabs(member.name) or any( - part == ".." for part in member.name.replace("\\", "/").split("/") - ): - raise error_class( - f"Unsafe path in tar archive: {member.name} (potential path traversal)" - ) + # Validate every member before extracting anything. + for member in members: + # Reject absolute paths and any path component that is "..". + if os.path.isabs(member.name) or any( + part == ".." for part in member.name.replace("\\", "/").split("/") + ): + raise error_class( + f"Unsafe path in tar archive: {member.name} (potential path traversal)" + ) - # Confirm the resolved path stays inside dest_dir. - member_path = (dest_dir / member.name).resolve() - try: - member_path.relative_to(dest_resolved) - except ValueError: - raise error_class( - f"Unsafe path in tar archive: {member.name} (potential path traversal)" - ) + # Confirm the resolved path stays inside dest_dir. + member_path = (dest_dir / member.name).resolve() + try: + member_path.relative_to(dest_resolved) + except ValueError: + raise error_class( + f"Unsafe path in tar archive: {member.name} (potential path traversal)" + ) - # Reject symlinks and hard links. - if member.issym() or member.islnk(): - raise error_class( - f"Symlinks are not allowed in archive: {member.name}" - ) + # Reject symlinks and hard links. + if member.issym() or member.islnk(): + raise error_class( + f"Symlinks are not allowed in archive: {member.name}" + ) - # Only allow regular files and directories. - if not (member.isreg() or member.isdir()): - raise error_class( - f"Non-regular file in archive: {member.name}" - ) + # Only allow regular files and directories. + if not (member.isreg() or member.isdir()): + raise error_class( + f"Non-regular file in archive: {member.name}" + ) - safe_members.append(member) + safe_members.append(member) - # Extract — use the "data" filter on Python 3.12+ for extra hardening. - # On older versions pass only the pre-validated members so that no - # unvetted entry (added concurrently or via a race) slips through. - if sys.version_info >= (3, 12): - tf.extractall(dest_dir, filter="data") # type: ignore[call-arg] - else: - tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above + # Extract — use the "data" filter on Python 3.12+ for extra hardening. + # On older versions pass only the pre-validated members so that no + # unvetted entry (added concurrently or via a race) slips through. + if sys.version_info >= (3, 12): + tf.extractall(dest_dir, filter="data") # type: ignore[call-arg] + else: + tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above + except error_class: + raise + except (tarfile.TarError, OSError) as e: + raise error_class(f"Failed to read archive {archive_path}: {e}") from e @dataclass