mirror of
https://github.com/github/spec-kit.git
synced 2026-07-03 12:28:06 +08:00
Fix GNU sparse skip in safe_extract_tarball; use response.geturl() for redirect-safe format detection and HTTPS re-check
Agent-Logs-Url: https://github.com/github/spec-kit/sessions/739d3f73-200b-417a-8a86-134329200560 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
05798a9e70
commit
1015ff24da
@@ -2633,16 +2633,25 @@ def preset_add(
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
archive_fmt = _det_fmt(from_url)
|
||||
final_url = from_url
|
||||
try:
|
||||
with urllib.request.urlopen(from_url, timeout=60) as response:
|
||||
final_url = response.geturl()
|
||||
if not archive_fmt:
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
archive_fmt = _det_fmt(from_url, content_type)
|
||||
archive_fmt = _det_fmt(final_url, content_type)
|
||||
archive_data = response.read()
|
||||
except urllib.error.URLError as e:
|
||||
console.print(f"[red]Error:[/red] Failed to download: {e}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
# Re-validate scheme after any redirect (scheme-downgrade guard).
|
||||
_fp = _urlparse(final_url)
|
||||
_fl = _fp.hostname in ("localhost", "127.0.0.1", "::1")
|
||||
if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl):
|
||||
console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
if not archive_fmt:
|
||||
console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.")
|
||||
console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.")
|
||||
@@ -3652,15 +3661,24 @@ def extension_add(
|
||||
download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads"
|
||||
download_dir.mkdir(parents=True, exist_ok=True)
|
||||
archive_fmt = detect_archive_format(from_url)
|
||||
final_url = from_url
|
||||
archive_path = None
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(from_url, timeout=60) as response:
|
||||
final_url = response.geturl()
|
||||
if not archive_fmt:
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
archive_fmt = detect_archive_format(from_url, content_type)
|
||||
archive_fmt = detect_archive_format(final_url, content_type)
|
||||
archive_data = response.read()
|
||||
|
||||
# Re-validate scheme after any redirect (scheme-downgrade guard).
|
||||
_fp = urlparse(final_url)
|
||||
_fl = _fp.hostname in ("localhost", "127.0.0.1", "::1")
|
||||
if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl):
|
||||
console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}")
|
||||
raise typer.Exit(1)
|
||||
|
||||
if not archive_fmt:
|
||||
console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.")
|
||||
console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.")
|
||||
|
||||
@@ -173,11 +173,15 @@ def safe_extract_tarball(
|
||||
# Tar metadata member types to skip during validation — they carry no
|
||||
# extractable payload and are generated automatically by many common
|
||||
# archiving tools (e.g. PAX headers, GNU longname/longlink entries).
|
||||
# GNUTYPE_SPARSE is intentionally excluded: it carries a real file payload
|
||||
# and tarfile.TarInfo.isreg() returns True for it, so it passes the
|
||||
# regular-file check below and is extracted correctly.
|
||||
_TAR_METADATA_TYPES = (
|
||||
tarfile.XHDTYPE, # PAX extended header
|
||||
tarfile.XGLTYPE, # PAX global extended header
|
||||
tarfile.SOLARIS_XHDTYPE, # Solaris PAX extended header
|
||||
*tarfile.GNU_TYPES, # GNU longname / longlink / sparse
|
||||
tarfile.XHDTYPE, # PAX extended header
|
||||
tarfile.XGLTYPE, # PAX global extended header
|
||||
tarfile.SOLARIS_XHDTYPE, # Solaris PAX extended header
|
||||
tarfile.GNUTYPE_LONGNAME, # GNU long path name (metadata only)
|
||||
tarfile.GNUTYPE_LONGLINK, # GNU long link name (metadata only)
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -2153,14 +2157,17 @@ class ExtensionCatalog:
|
||||
version = ext_info.get("version", "unknown")
|
||||
|
||||
# Detect archive format from URL; resolve via Content-Type when needed.
|
||||
# `final_url` may differ from `download_url` if the server redirects.
|
||||
archive_fmt = detect_archive_format(download_url)
|
||||
final_url = download_url
|
||||
|
||||
# Download the archive
|
||||
try:
|
||||
with self._open_url(download_url, timeout=60) as response:
|
||||
final_url = response.geturl()
|
||||
if not archive_fmt:
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
archive_fmt = detect_archive_format(download_url, content_type)
|
||||
archive_fmt = detect_archive_format(final_url, content_type)
|
||||
archive_data = response.read()
|
||||
|
||||
except urllib.error.URLError as e:
|
||||
@@ -2168,6 +2175,16 @@ class ExtensionCatalog:
|
||||
except IOError as e:
|
||||
raise ExtensionError(f"Failed to read extension archive from {download_url}: {e}")
|
||||
|
||||
# Re-validate scheme after any redirect to guard against scheme-downgrade.
|
||||
_final_parsed = urlparse(final_url)
|
||||
_final_is_localhost = _final_parsed.hostname in ("localhost", "127.0.0.1", "::1")
|
||||
if _final_parsed.scheme != "https" and not (
|
||||
_final_parsed.scheme == "http" and _final_is_localhost
|
||||
):
|
||||
raise ExtensionError(
|
||||
f"Extension download URL was redirected to a non-HTTPS URL: {final_url}"
|
||||
)
|
||||
|
||||
# Choose file extension based on detected format.
|
||||
if not archive_fmt:
|
||||
raise ExtensionError(
|
||||
|
||||
@@ -2314,13 +2314,16 @@ class PresetCatalog:
|
||||
version = pack_info.get("version", "unknown")
|
||||
|
||||
# Detect archive format from URL; resolve via Content-Type when needed.
|
||||
# `final_url` may differ from `download_url` if the server redirects.
|
||||
archive_fmt = detect_archive_format(download_url)
|
||||
final_url = download_url
|
||||
|
||||
try:
|
||||
with self._open_url(download_url, timeout=60) as response:
|
||||
final_url = response.geturl()
|
||||
if not archive_fmt:
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
archive_fmt = detect_archive_format(download_url, content_type)
|
||||
archive_fmt = detect_archive_format(final_url, content_type)
|
||||
archive_data = response.read()
|
||||
|
||||
except urllib.error.URLError as e:
|
||||
@@ -2330,6 +2333,16 @@ class PresetCatalog:
|
||||
except IOError as e:
|
||||
raise PresetError(f"Failed to read preset archive from {download_url}: {e}")
|
||||
|
||||
# Re-validate scheme after any redirect to guard against scheme-downgrade.
|
||||
_final_parsed = urlparse(final_url)
|
||||
_final_is_localhost = _final_parsed.hostname in ("localhost", "127.0.0.1", "::1")
|
||||
if _final_parsed.scheme != "https" and not (
|
||||
_final_parsed.scheme == "http" and _final_is_localhost
|
||||
):
|
||||
raise PresetError(
|
||||
f"Preset download URL was redirected to a non-HTTPS URL: {final_url}"
|
||||
)
|
||||
|
||||
# Choose file extension based on detected format.
|
||||
if not archive_fmt:
|
||||
raise PresetError(
|
||||
|
||||
@@ -2759,6 +2759,7 @@ class TestExtensionCatalog:
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.read.return_value = zip_bytes
|
||||
mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-ext.zip"
|
||||
mock_response.__enter__ = lambda s: s
|
||||
mock_response.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
@@ -3661,6 +3662,7 @@ class TestDownloadExtensionBundled:
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.read.return_value = b"fake zip data"
|
||||
mock_response.geturl.return_value = "https://example.com/git-2.0.0.zip"
|
||||
mock_response.__enter__ = lambda s: s
|
||||
mock_response.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
|
||||
@@ -1613,6 +1613,7 @@ class TestPresetCatalog:
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.read.return_value = zip_bytes
|
||||
mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-pack.zip"
|
||||
mock_response.__enter__ = lambda s: s
|
||||
mock_response.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user