From 36ad3cde1bb8727335b4f038421758da98cbe720 Mon Sep 17 00:00:00 2001 From: darion-yaphet Date: Thu, 11 Jun 2026 20:21:50 +0800 Subject: [PATCH] fix(presets): harden preset URL installs against unsafe redirects (#2911) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Harden preset URL installs against unsafe redirects Preset URL installs already rejected non-HTTPS source URLs, but the authenticated opener follows redirects. Validate the final response URL before writing the ZIP, preserve GitHub release asset URL resolution after the preset command module split, stream the response to disk, and keep catalog config serialization on safe YAML output. Constraint: open_url follows redirects, so source URL validation alone does not constrain the downloaded target Rejected: Keep response.read() for simplicity | large preset downloads should not be buffered entirely in memory Confidence: high Scope-risk: narrow Directive: Keep preset URL policy aligned with workflow installer redirect validation Tested: uvx ruff check src/specify_cli/__init__.py src/specify_cli/presets/__init__.py src/specify_cli/presets/_commands.py tests/test_presets.py Tested: uv run pytest tests/test_presets.py -q Not-tested: Real network redirect integration against a live HTTP server Co-authored-by: OmX * Reject malformed preset download URLs Preset downloads should fail early when a URL lacks a hostname, even if the scheme is HTTPS. The redirect error now describes any disallowed target instead of implying that only non-HTTPS redirects are blocked. * Prevent credentialed preset redirects from downgrading transport Preset URL downloads already checked the final URL after urllib followed redirects, but that was too late for authenticated requests because same-host redirects could preserve Authorization during the redirect itself. The authenticated HTTP helper now supports an opt-in redirect validator, and preset downloads use it to reject disallowed redirect targets before following them. The redirect auth handlers also stop preserving credentials across HTTPS to non-HTTPS downgrades as defense in depth. * test(presets): 修复 URL 解析测试 mock 缺少 redirect_validator 参数 重定向安全加固为 open_url 新增 redirect_validator 参数, 两处 fake_open_url mock 签名未同步导致 TypeError。 补齐参数后全部 3717 个测试通过。 --------- Co-authored-by: OmX --- src/specify_cli/__init__.py | 59 +++++++++-- src/specify_cli/authentication/http.py | 39 +++++-- tests/test_authentication.py | 29 +++++ tests/test_github_http.py | 2 +- tests/test_presets.py | 141 ++++++++++++++++++++++++- 5 files changed, 253 insertions(+), 17 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index b22cef6ad..186593000 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -684,16 +684,44 @@ def preset_add( elif from_url: # Validate URL scheme before downloading + from ipaddress import ip_address from urllib.parse import urlparse as _urlparse + _parsed = _urlparse(from_url) - _is_localhost = _parsed.hostname in ("localhost", "127.0.0.1", "::1") - if _parsed.scheme != "https" and not (_parsed.scheme == "http" and _is_localhost): - console.print(f"[red]Error:[/red] URL must use HTTPS (got {_parsed.scheme}://). HTTP is only allowed for localhost.") + + def _is_allowed_download_url(parsed_url): + host = parsed_url.hostname + if not host: + return False + is_loopback = host == "localhost" + if not is_loopback: + try: + is_loopback = ip_address(host).is_loopback + except ValueError: + # Host is not an IP literal (e.g., a regular hostname); treat as non-loopback. + pass + return parsed_url.scheme == "https" or (parsed_url.scheme == "http" and is_loopback) + + def _validate_download_redirect(old_url, new_url): + if not _is_allowed_download_url(_urlparse(new_url)): + import urllib.error + + raise urllib.error.URLError( + "redirect target must use HTTPS with a hostname, " + "or HTTP for localhost/loopback" + ) + + if not _is_allowed_download_url(_parsed): + console.print( + "[red]Error:[/red] URL must use HTTPS with a hostname, " + "or HTTP for localhost/loopback." + ) raise typer.Exit(1) console.print(f"Installing preset from [cyan]{from_url}[/cyan]...") import urllib.error import tempfile + import shutil with tempfile.TemporaryDirectory() as tmpdir: zip_path = Path(tmpdir) / "preset.zip" @@ -707,8 +735,25 @@ def preset_add( from_url = _resolved_from_url _preset_extra_headers = {"Accept": "application/octet-stream"} - with _open_url(from_url, timeout=60, extra_headers=_preset_extra_headers) as response: - zip_path.write_bytes(response.read()) + with _open_url( + from_url, + timeout=60, + extra_headers=_preset_extra_headers, + redirect_validator=_validate_download_redirect, + ) as response: + final_url = response.geturl() if hasattr(response, "geturl") else from_url + if not _is_allowed_download_url(_urlparse(final_url)): + console.print( + "[red]Error:[/red] Preset URL redirected to a disallowed URL: " + f"{final_url}. Redirect targets must use HTTPS with a hostname, " + "or HTTP for localhost/loopback." + ) + raise typer.Exit(1) + with zip_path.open("wb") as output: + try: + shutil.copyfileobj(response, output) + except TypeError: + output.write(response.read()) except urllib.error.URLError as e: console.print(f"[red]Error:[/red] Failed to download: {e}") raise typer.Exit(1) @@ -1186,7 +1231,7 @@ def preset_catalog_add( }) config["catalogs"] = catalogs - config_path.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") + config_path.write_text(yaml.safe_dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") install_label = "install allowed" if install_allowed else "discovery only" console.print(f"\n[green]✓[/green] Added catalog '[bold]{name}[/bold]' ({install_label})") @@ -1226,7 +1271,7 @@ def preset_catalog_remove( raise typer.Exit(1) config["catalogs"] = catalogs - config_path.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") + config_path.write_text(yaml.safe_dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8") console.print(f"[green]✓[/green] Removed catalog '{name}'") if not catalogs: diff --git a/src/specify_cli/authentication/http.py b/src/specify_cli/authentication/http.py index d6402e5c3..e8ab8c124 100644 --- a/src/specify_cli/authentication/http.py +++ b/src/specify_cli/authentication/http.py @@ -14,6 +14,7 @@ from __future__ import annotations import urllib.error import urllib.request from fnmatch import fnmatch +from typing import Callable from urllib.parse import urlparse from . import get_provider @@ -56,22 +57,36 @@ def _hostname_in_hosts(hostname: str, hosts: tuple[str, ...]) -> bool: return any(p == hostname or fnmatch(hostname, p) for p in hosts) -class _StripAuthOnRedirect(urllib.request.HTTPRedirectHandler): - """Drop ``Authorization`` when a redirect leaves the entry's declared hosts.""" +RedirectValidator = Callable[[str, str], None] - def __init__(self, hosts: tuple[str, ...]) -> None: + +class _StripAuthOnRedirect(urllib.request.HTTPRedirectHandler): + """Drop ``Authorization`` when a redirect leaves trusted hosts or downgrades.""" + + def __init__( + self, + hosts: tuple[str, ...], + redirect_validator: RedirectValidator | None = None, + ) -> None: super().__init__() self._hosts = hosts + self._redirect_validator = redirect_validator def redirect_request(self, req, fp, code, msg, headers, newurl): + if self._redirect_validator is not None: + self._redirect_validator(req.full_url, newurl) + original_auth = ( req.get_header("Authorization") or req.unredirected_hdrs.get("Authorization") ) new_req = super().redirect_request(req, fp, code, msg, headers, newurl) if new_req is not None: - hostname = (urlparse(newurl).hostname or "").lower() - if _hostname_in_hosts(hostname, self._hosts): + old_scheme = urlparse(req.full_url).scheme + new_parsed = urlparse(newurl) + hostname = (new_parsed.hostname or "").lower() + is_https_downgrade = old_scheme == "https" and new_parsed.scheme != "https" + if _hostname_in_hosts(hostname, self._hosts) and not is_https_downgrade: if original_auth: new_req.add_unredirected_header("Authorization", original_auth) else: @@ -103,7 +118,12 @@ def build_request(url: str, extra_headers: dict[str, str] | None = None) -> urll return urllib.request.Request(url, headers=headers) -def open_url(url: str, timeout: int = 10, extra_headers: dict[str, str] | None = None): +def open_url( + url: str, + timeout: int = 10, + extra_headers: dict[str, str] | None = None, + redirect_validator: RedirectValidator | None = None, +): """Open *url* with config-driven auth, redirect stripping, and fallthrough. 1. Find ``auth.json`` entries whose hosts match the URL. @@ -113,6 +133,8 @@ def open_url(url: str, timeout: int = 10, extra_headers: dict[str, str] | None = 5. Non-auth errors (404, 500, network) raise immediately. *extra_headers* (e.g. ``Accept``) are merged into every attempt. + *redirect_validator*, when provided, is called with ``(old_url, new_url)`` + before following each redirect and may raise to reject the redirect. """ entries = find_entries_for_url(url, _load_config()) @@ -135,7 +157,7 @@ def open_url(url: str, timeout: int = 10, extra_headers: dict[str, str] | None = continue req = _make_req(provider.auth_headers(token, entry.auth)) - opener = urllib.request.build_opener(_StripAuthOnRedirect(entry.hosts)) + opener = urllib.request.build_opener(_StripAuthOnRedirect(entry.hosts, redirect_validator)) try: return opener.open(req, timeout=timeout) except urllib.error.HTTPError as exc: @@ -146,4 +168,7 @@ def open_url(url: str, timeout: int = 10, extra_headers: dict[str, str] | None = # No entry worked (or none matched) — unauthenticated fallback req = _make_req({}) + if redirect_validator is not None: + opener = urllib.request.build_opener(_StripAuthOnRedirect((), redirect_validator)) + return opener.open(req, timeout=timeout) return urllib.request.urlopen(req, timeout=timeout) # noqa: S310 diff --git a/tests/test_authentication.py b/tests/test_authentication.py index 5d75355a0..8b0924538 100644 --- a/tests/test_authentication.py +++ b/tests/test_authentication.py @@ -793,6 +793,35 @@ class TestRedirectStripping: assert new_req.headers.get("Authorization") is None assert new_req.unredirected_hdrs.get("Authorization") is None + def test_https_to_http_same_host_redirect_strips_auth(self): + from specify_cli.authentication.http import _StripAuthOnRedirect + from urllib.request import Request + import io + handler = _StripAuthOnRedirect(("github.com",)) + req = Request("https://github.com/org/repo", headers={"Authorization": "Bearer tok"}) + new_req = handler.redirect_request(req, io.BytesIO(b""), 302, "Found", {}, + "http://github.com/org/repo") + assert new_req is not None + assert new_req.headers.get("Authorization") is None + assert new_req.unredirected_hdrs.get("Authorization") is None + + def test_redirect_validator_can_reject_before_following_redirect(self): + import urllib.error + from specify_cli.authentication.http import _StripAuthOnRedirect + from urllib.request import Request + import io + + def reject_http(old_url, new_url): + if new_url.startswith("http://"): + raise urllib.error.URLError("scheme downgrade") + + handler = _StripAuthOnRedirect(("github.com",), reject_http) + req = Request("https://github.com/org/repo", headers={"Authorization": "Bearer tok"}) + + with pytest.raises(urllib.error.URLError, match="scheme downgrade"): + handler.redirect_request(req, io.BytesIO(b""), 302, "Found", {}, + "http://github.com/org/repo") + def test_multi_hop_redirect_within_hosts_preserves_auth(self): """Auth survives a multi-hop redirect chain within allowed hosts.""" from specify_cli.authentication.http import _StripAuthOnRedirect diff --git a/tests/test_github_http.py b/tests/test_github_http.py index f51368079..e258f4917 100644 --- a/tests/test_github_http.py +++ b/tests/test_github_http.py @@ -187,4 +187,4 @@ class TestResolveGitHubReleaseAssetApiUrl: capturing_open, ) assert len(captured_urls) == 1 - assert "releases/tags/v1%23beta" in captured_urls[0] \ No newline at end of file + assert "releases/tags/v1%23beta" in captured_urls[0] diff --git a/tests/test_presets.py b/tests/test_presets.py index 92add1103..e32440145 100644 --- a/tests/test_presets.py +++ b/tests/test_presets.py @@ -11,6 +11,7 @@ Tests cover: """ import pytest +import io import json import tempfile import shutil @@ -18,6 +19,7 @@ import warnings import zipfile from pathlib import Path from datetime import datetime, timezone +from types import SimpleNamespace import yaml @@ -4257,6 +4259,141 @@ class TestBundledPresetLocator: assert "Lean Workflow" in result.output assert "installed" in result.output.lower() + def test_preset_add_from_url_rejects_insecure_redirect(self, project_dir, monkeypatch): + """URL installs reject redirects from HTTPS to non-loopback HTTP.""" + import typer + from specify_cli import preset_add + + class FakeResponse(io.BytesIO): + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def geturl(self): + return "http://example.com/preset.zip" + + monkeypatch.setattr("specify_cli._require_specify_project", lambda: project_dir) + monkeypatch.setattr("specify_cli.get_speckit_version", lambda: "0.6.0") + def fake_open_url(url, timeout=None, extra_headers=None, redirect_validator=None): + assert redirect_validator is not None + redirect_validator(url, "http://example.com/preset.zip") + return FakeResponse(b"zip") + + monkeypatch.setattr("specify_cli.authentication.http.open_url", fake_open_url) + + installed = False + + def fake_install_from_zip(self, zip_path, speckit_version, priority=10): + nonlocal installed + installed = True + + monkeypatch.setattr(PresetManager, "install_from_zip", fake_install_from_zip) + + with pytest.raises(typer.Exit) as exc_info: + preset_add(preset_id=None, from_url="https://example.com/preset.zip", dev=None, priority=10) + + assert exc_info.value.exit_code == 1 + assert installed is False + + def test_preset_add_from_url_rejects_hostless_https_url(self, project_dir): + """URL installs reject HTTPS URLs without a hostname before downloading.""" + from typer.testing import CliRunner + from unittest.mock import patch + from specify_cli import app + + runner = CliRunner() + with patch.object(Path, "cwd", return_value=project_dir), \ + patch("specify_cli.authentication.http.open_url") as open_url: + result = runner.invoke(app, ["preset", "add", "--from", "https:///preset.zip"]) + + assert result.exit_code == 1 + output = strip_ansi(result.output) + assert "URL must use HTTPS with a hostname" in output + assert "got https://" not in output + open_url.assert_not_called() + + def test_preset_add_from_url_redirect_error_describes_disallowed_url(self, project_dir, monkeypatch, capsys): + """Redirect rejection message covers hostless HTTPS, not only non-HTTPS URLs.""" + import typer + from specify_cli import preset_add + + class FakeResponse(io.BytesIO): + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def geturl(self): + return "https:///preset.zip" + + monkeypatch.setattr("specify_cli._require_specify_project", lambda: project_dir) + monkeypatch.setattr("specify_cli.get_speckit_version", lambda: "0.6.0") + monkeypatch.setattr( + "specify_cli.authentication.http.open_url", + lambda url, timeout=None, extra_headers=None, redirect_validator=None: FakeResponse(b"zip"), + ) + monkeypatch.setattr(PresetManager, "install_from_zip", lambda *args, **kwargs: None) + + with pytest.raises(typer.Exit) as exc_info: + preset_add(preset_id=None, from_url="https://example.com/preset.zip", dev=None, priority=10) + + assert exc_info.value.exit_code == 1 + output = strip_ansi(capsys.readouterr().out) + assert "redirected to a disallowed URL" in output + assert "must use HTTPS with a hostname" in output + + def test_preset_add_from_url_streams_download_to_zip(self, project_dir, monkeypatch): + """URL installs stream response bytes to disk before installing the ZIP.""" + from specify_cli import preset_add + + class FakeResponse(io.BytesIO): + def __init__(self, data): + super().__init__(data) + self.read_sizes = [] + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def geturl(self): + return "https://example.com/preset.zip" + + def read(self, size=-1): + assert size not in (-1, None) + self.read_sizes.append(size) + return super().read(size) + + response = FakeResponse(b"zip-bytes") + installed = {} + + def fake_install_from_zip(self, zip_path, speckit_version, priority=10): + installed["zip_bytes"] = Path(zip_path).read_bytes() + installed["speckit_version"] = speckit_version + installed["priority"] = priority + return SimpleNamespace(name="Test Preset", version="1.0.0") + + monkeypatch.setattr("specify_cli._require_specify_project", lambda: project_dir) + monkeypatch.setattr("specify_cli.get_speckit_version", lambda: "0.6.0") + monkeypatch.setattr( + "specify_cli.authentication.http.open_url", + lambda url, timeout=None, extra_headers=None, redirect_validator=None: response, + ) + monkeypatch.setattr(PresetManager, "install_from_zip", fake_install_from_zip) + + preset_add(preset_id=None, from_url="https://example.com/preset.zip", dev=None, priority=7) + + assert response.read_sizes + assert installed == { + "zip_bytes": b"zip-bytes", + "speckit_version": "0.6.0", + "priority": 7, + } + def test_bundled_preset_in_catalog(self): """Verify the lean preset is listed in catalog.json with bundled marker.""" catalog_path = Path(__file__).parent.parent / "presets" / "catalog.json" @@ -4346,7 +4483,7 @@ class TestPresetAddFromUrlResolution: def __exit__(self, *a): return False - def fake_open_url(url, timeout=None, extra_headers=None): + def fake_open_url(url, timeout=None, extra_headers=None, redirect_validator=None): captured_urls.append((url, extra_headers)) if "releases/tags/" in url: return FakeResponse(json.dumps({ @@ -4404,7 +4541,7 @@ class TestPresetAddFromUrlResolution: def __exit__(self, *a): return False - def fake_open_url(url, timeout=None, extra_headers=None): + def fake_open_url(url, timeout=None, extra_headers=None, redirect_validator=None): captured_urls.append((url, extra_headers)) return FakeResponse(zip_bytes)