Catch TarError/OSError in _safe_extract_tarball; rename zip_path to archive_path in extension_update

Agent-Logs-Url: https://github.com/github/spec-kit/sessions/953d7f62-a75a-4690-90a9-98345cae824d

Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-05-06 19:38:57 +00:00
committed by GitHub
parent d00509e770
commit 0fd0bf6b9f
2 changed files with 51 additions and 46 deletions

View File

@@ -4326,18 +4326,18 @@ def extension_update(
backup_hooks[hook_name] = ext_hooks
# 5. Download new version
zip_path = catalog.download_extension(extension_id)
archive_path = catalog.download_extension(extension_id)
try:
# 6. Validate extension ID from archive BEFORE modifying installation
# Handle both root-level and nested extension.yml (GitHub auto-generated archives)
from .extensions import _detect_archive_format
import tarfile
archive_fmt = _detect_archive_format(str(zip_path))
archive_fmt = _detect_archive_format(str(archive_path))
import yaml
manifest_data = None
if archive_fmt == "tar.gz":
with tarfile.open(zip_path, "r:gz") as tf:
with tarfile.open(archive_path, "r:gz") as tf:
# First try root-level extension.yml
try:
m = tf.getmember("extension.yml")
@@ -4354,7 +4354,7 @@ def extension_update(
with f:
manifest_data = yaml.safe_load(f.read()) or {}
else:
with zipfile.ZipFile(zip_path, "r") as zf:
with zipfile.ZipFile(archive_path, "r") as zf:
namelist = zf.namelist()
# First try root-level extension.yml
@@ -4382,7 +4382,7 @@ def extension_update(
manager.remove(extension_id, keep_config=True)
# 8. Install new version
_ = manager.install_from_zip(zip_path, speckit_version)
_ = manager.install_from_zip(archive_path, speckit_version)
# Restore user config files from backup after successful install.
new_extension_dir = manager.extensions_dir / extension_id
@@ -4428,9 +4428,9 @@ def extension_update(
hook["enabled"] = False
hook_executor.save_project_config(config)
finally:
# Clean up downloaded ZIP
if zip_path.exists():
zip_path.unlink()
# Clean up downloaded archive
if archive_path.exists():
archive_path.unlink()
# 10. Clean up backup on success
if backup_base.exists():

View File

@@ -171,50 +171,55 @@ def _safe_extract_tarball(
"""
dest_resolved = dest_dir.resolve()
with tarfile.open(archive_path, "r:gz") as tf:
members = tf.getmembers()
safe_members = []
try:
with tarfile.open(archive_path, "r:gz") as tf:
members = tf.getmembers()
safe_members = []
# Validate every member before extracting anything.
for member in members:
# Reject absolute paths and any path component that is "..".
if os.path.isabs(member.name) or any(
part == ".." for part in member.name.replace("\\", "/").split("/")
):
raise error_class(
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
)
# Validate every member before extracting anything.
for member in members:
# Reject absolute paths and any path component that is "..".
if os.path.isabs(member.name) or any(
part == ".." for part in member.name.replace("\\", "/").split("/")
):
raise error_class(
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
)
# Confirm the resolved path stays inside dest_dir.
member_path = (dest_dir / member.name).resolve()
try:
member_path.relative_to(dest_resolved)
except ValueError:
raise error_class(
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
)
# Confirm the resolved path stays inside dest_dir.
member_path = (dest_dir / member.name).resolve()
try:
member_path.relative_to(dest_resolved)
except ValueError:
raise error_class(
f"Unsafe path in tar archive: {member.name} (potential path traversal)"
)
# Reject symlinks and hard links.
if member.issym() or member.islnk():
raise error_class(
f"Symlinks are not allowed in archive: {member.name}"
)
# Reject symlinks and hard links.
if member.issym() or member.islnk():
raise error_class(
f"Symlinks are not allowed in archive: {member.name}"
)
# Only allow regular files and directories.
if not (member.isreg() or member.isdir()):
raise error_class(
f"Non-regular file in archive: {member.name}"
)
# Only allow regular files and directories.
if not (member.isreg() or member.isdir()):
raise error_class(
f"Non-regular file in archive: {member.name}"
)
safe_members.append(member)
safe_members.append(member)
# Extract — use the "data" filter on Python 3.12+ for extra hardening.
# On older versions pass only the pre-validated members so that no
# unvetted entry (added concurrently or via a race) slips through.
if sys.version_info >= (3, 12):
tf.extractall(dest_dir, filter="data") # type: ignore[call-arg]
else:
tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above
# Extract — use the "data" filter on Python 3.12+ for extra hardening.
# On older versions pass only the pre-validated members so that no
# unvetted entry (added concurrently or via a race) slips through.
if sys.version_info >= (3, 12):
tf.extractall(dest_dir, filter="data") # type: ignore[call-arg]
else:
tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above
except error_class:
raise
except (tarfile.TarError, OSError) as e:
raise error_class(f"Failed to read archive {archive_path}: {e}") from e
@dataclass