fix(presets): harden preset URL installs against unsafe redirects (#2911)

* Harden preset URL installs against unsafe redirects

Preset URL installs already rejected non-HTTPS source URLs, but the authenticated opener follows redirects. Validate the final response URL before writing the ZIP, preserve GitHub release asset URL resolution after the preset command module split, stream the response to disk, and keep catalog config serialization on safe YAML output.

Constraint: open_url follows redirects, so source URL validation alone does not constrain the downloaded target

Rejected: Keep response.read() for simplicity | large preset downloads should not be buffered entirely in memory

Confidence: high

Scope-risk: narrow

Directive: Keep preset URL policy aligned with workflow installer redirect validation

Tested: uvx ruff check src/specify_cli/__init__.py src/specify_cli/presets/__init__.py src/specify_cli/presets/_commands.py tests/test_presets.py

Tested: uv run pytest tests/test_presets.py -q

Not-tested: Real network redirect integration against a live HTTP server

Co-authored-by: OmX <omx@oh-my-codex.dev>

* Reject malformed preset download URLs

Preset downloads should fail early when a URL lacks a hostname, even if the scheme is HTTPS. The redirect error now describes any disallowed target instead of implying that only non-HTTPS redirects are blocked.

* Prevent credentialed preset redirects from downgrading transport

Preset URL downloads already checked the final URL after urllib followed redirects, but that was too late for authenticated requests because same-host redirects could preserve Authorization during the redirect itself. The authenticated HTTP helper now supports an opt-in redirect validator, and preset downloads use it to reject disallowed redirect targets before following them. The redirect auth handlers also stop preserving credentials across HTTPS to non-HTTPS downgrades as defense in depth.

* test(presets): 修复 URL 解析测试 mock 缺少 redirect_validator 参数

重定向安全加固为 open_url 新增 redirect_validator 参数,
两处 fake_open_url mock 签名未同步导致 TypeError。
补齐参数后全部 3717 个测试通过。

---------

Co-authored-by: OmX <omx@oh-my-codex.dev>
This commit is contained in:
darion-yaphet
2026-06-11 20:21:50 +08:00
committed by GitHub
parent 5ae7ff53d0
commit 36ad3cde1b
5 changed files with 253 additions and 17 deletions

View File

@@ -684,16 +684,44 @@ def preset_add(
elif from_url:
# Validate URL scheme before downloading
from ipaddress import ip_address
from urllib.parse import urlparse as _urlparse
_parsed = _urlparse(from_url)
_is_localhost = _parsed.hostname in ("localhost", "127.0.0.1", "::1")
if _parsed.scheme != "https" and not (_parsed.scheme == "http" and _is_localhost):
console.print(f"[red]Error:[/red] URL must use HTTPS (got {_parsed.scheme}://). HTTP is only allowed for localhost.")
def _is_allowed_download_url(parsed_url):
host = parsed_url.hostname
if not host:
return False
is_loopback = host == "localhost"
if not is_loopback:
try:
is_loopback = ip_address(host).is_loopback
except ValueError:
# Host is not an IP literal (e.g., a regular hostname); treat as non-loopback.
pass
return parsed_url.scheme == "https" or (parsed_url.scheme == "http" and is_loopback)
def _validate_download_redirect(old_url, new_url):
if not _is_allowed_download_url(_urlparse(new_url)):
import urllib.error
raise urllib.error.URLError(
"redirect target must use HTTPS with a hostname, "
"or HTTP for localhost/loopback"
)
if not _is_allowed_download_url(_parsed):
console.print(
"[red]Error:[/red] URL must use HTTPS with a hostname, "
"or HTTP for localhost/loopback."
)
raise typer.Exit(1)
console.print(f"Installing preset from [cyan]{from_url}[/cyan]...")
import urllib.error
import tempfile
import shutil
with tempfile.TemporaryDirectory() as tmpdir:
zip_path = Path(tmpdir) / "preset.zip"
@@ -707,8 +735,25 @@ def preset_add(
from_url = _resolved_from_url
_preset_extra_headers = {"Accept": "application/octet-stream"}
with _open_url(from_url, timeout=60, extra_headers=_preset_extra_headers) as response:
zip_path.write_bytes(response.read())
with _open_url(
from_url,
timeout=60,
extra_headers=_preset_extra_headers,
redirect_validator=_validate_download_redirect,
) as response:
final_url = response.geturl() if hasattr(response, "geturl") else from_url
if not _is_allowed_download_url(_urlparse(final_url)):
console.print(
"[red]Error:[/red] Preset URL redirected to a disallowed URL: "
f"{final_url}. Redirect targets must use HTTPS with a hostname, "
"or HTTP for localhost/loopback."
)
raise typer.Exit(1)
with zip_path.open("wb") as output:
try:
shutil.copyfileobj(response, output)
except TypeError:
output.write(response.read())
except urllib.error.URLError as e:
console.print(f"[red]Error:[/red] Failed to download: {e}")
raise typer.Exit(1)
@@ -1186,7 +1231,7 @@ def preset_catalog_add(
})
config["catalogs"] = catalogs
config_path.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8")
config_path.write_text(yaml.safe_dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8")
install_label = "install allowed" if install_allowed else "discovery only"
console.print(f"\n[green]✓[/green] Added catalog '[bold]{name}[/bold]' ({install_label})")
@@ -1226,7 +1271,7 @@ def preset_catalog_remove(
raise typer.Exit(1)
config["catalogs"] = catalogs
config_path.write_text(yaml.dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8")
config_path.write_text(yaml.safe_dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True), encoding="utf-8")
console.print(f"[green]✓[/green] Removed catalog '{name}'")
if not catalogs:

View File

@@ -14,6 +14,7 @@ from __future__ import annotations
import urllib.error
import urllib.request
from fnmatch import fnmatch
from typing import Callable
from urllib.parse import urlparse
from . import get_provider
@@ -56,22 +57,36 @@ def _hostname_in_hosts(hostname: str, hosts: tuple[str, ...]) -> bool:
return any(p == hostname or fnmatch(hostname, p) for p in hosts)
class _StripAuthOnRedirect(urllib.request.HTTPRedirectHandler):
"""Drop ``Authorization`` when a redirect leaves the entry's declared hosts."""
RedirectValidator = Callable[[str, str], None]
def __init__(self, hosts: tuple[str, ...]) -> None:
class _StripAuthOnRedirect(urllib.request.HTTPRedirectHandler):
"""Drop ``Authorization`` when a redirect leaves trusted hosts or downgrades."""
def __init__(
self,
hosts: tuple[str, ...],
redirect_validator: RedirectValidator | None = None,
) -> None:
super().__init__()
self._hosts = hosts
self._redirect_validator = redirect_validator
def redirect_request(self, req, fp, code, msg, headers, newurl):
if self._redirect_validator is not None:
self._redirect_validator(req.full_url, newurl)
original_auth = (
req.get_header("Authorization")
or req.unredirected_hdrs.get("Authorization")
)
new_req = super().redirect_request(req, fp, code, msg, headers, newurl)
if new_req is not None:
hostname = (urlparse(newurl).hostname or "").lower()
if _hostname_in_hosts(hostname, self._hosts):
old_scheme = urlparse(req.full_url).scheme
new_parsed = urlparse(newurl)
hostname = (new_parsed.hostname or "").lower()
is_https_downgrade = old_scheme == "https" and new_parsed.scheme != "https"
if _hostname_in_hosts(hostname, self._hosts) and not is_https_downgrade:
if original_auth:
new_req.add_unredirected_header("Authorization", original_auth)
else:
@@ -103,7 +118,12 @@ def build_request(url: str, extra_headers: dict[str, str] | None = None) -> urll
return urllib.request.Request(url, headers=headers)
def open_url(url: str, timeout: int = 10, extra_headers: dict[str, str] | None = None):
def open_url(
url: str,
timeout: int = 10,
extra_headers: dict[str, str] | None = None,
redirect_validator: RedirectValidator | None = None,
):
"""Open *url* with config-driven auth, redirect stripping, and fallthrough.
1. Find ``auth.json`` entries whose hosts match the URL.
@@ -113,6 +133,8 @@ def open_url(url: str, timeout: int = 10, extra_headers: dict[str, str] | None =
5. Non-auth errors (404, 500, network) raise immediately.
*extra_headers* (e.g. ``Accept``) are merged into every attempt.
*redirect_validator*, when provided, is called with ``(old_url, new_url)``
before following each redirect and may raise to reject the redirect.
"""
entries = find_entries_for_url(url, _load_config())
@@ -135,7 +157,7 @@ def open_url(url: str, timeout: int = 10, extra_headers: dict[str, str] | None =
continue
req = _make_req(provider.auth_headers(token, entry.auth))
opener = urllib.request.build_opener(_StripAuthOnRedirect(entry.hosts))
opener = urllib.request.build_opener(_StripAuthOnRedirect(entry.hosts, redirect_validator))
try:
return opener.open(req, timeout=timeout)
except urllib.error.HTTPError as exc:
@@ -146,4 +168,7 @@ def open_url(url: str, timeout: int = 10, extra_headers: dict[str, str] | None =
# No entry worked (or none matched) — unauthenticated fallback
req = _make_req({})
if redirect_validator is not None:
opener = urllib.request.build_opener(_StripAuthOnRedirect((), redirect_validator))
return opener.open(req, timeout=timeout)
return urllib.request.urlopen(req, timeout=timeout) # noqa: S310

View File

@@ -793,6 +793,35 @@ class TestRedirectStripping:
assert new_req.headers.get("Authorization") is None
assert new_req.unredirected_hdrs.get("Authorization") is None
def test_https_to_http_same_host_redirect_strips_auth(self):
from specify_cli.authentication.http import _StripAuthOnRedirect
from urllib.request import Request
import io
handler = _StripAuthOnRedirect(("github.com",))
req = Request("https://github.com/org/repo", headers={"Authorization": "Bearer tok"})
new_req = handler.redirect_request(req, io.BytesIO(b""), 302, "Found", {},
"http://github.com/org/repo")
assert new_req is not None
assert new_req.headers.get("Authorization") is None
assert new_req.unredirected_hdrs.get("Authorization") is None
def test_redirect_validator_can_reject_before_following_redirect(self):
import urllib.error
from specify_cli.authentication.http import _StripAuthOnRedirect
from urllib.request import Request
import io
def reject_http(old_url, new_url):
if new_url.startswith("http://"):
raise urllib.error.URLError("scheme downgrade")
handler = _StripAuthOnRedirect(("github.com",), reject_http)
req = Request("https://github.com/org/repo", headers={"Authorization": "Bearer tok"})
with pytest.raises(urllib.error.URLError, match="scheme downgrade"):
handler.redirect_request(req, io.BytesIO(b""), 302, "Found", {},
"http://github.com/org/repo")
def test_multi_hop_redirect_within_hosts_preserves_auth(self):
"""Auth survives a multi-hop redirect chain within allowed hosts."""
from specify_cli.authentication.http import _StripAuthOnRedirect

View File

@@ -187,4 +187,4 @@ class TestResolveGitHubReleaseAssetApiUrl:
capturing_open,
)
assert len(captured_urls) == 1
assert "releases/tags/v1%23beta" in captured_urls[0]
assert "releases/tags/v1%23beta" in captured_urls[0]

View File

@@ -11,6 +11,7 @@ Tests cover:
"""
import pytest
import io
import json
import tempfile
import shutil
@@ -18,6 +19,7 @@ import warnings
import zipfile
from pathlib import Path
from datetime import datetime, timezone
from types import SimpleNamespace
import yaml
@@ -4257,6 +4259,141 @@ class TestBundledPresetLocator:
assert "Lean Workflow" in result.output
assert "installed" in result.output.lower()
def test_preset_add_from_url_rejects_insecure_redirect(self, project_dir, monkeypatch):
"""URL installs reject redirects from HTTPS to non-loopback HTTP."""
import typer
from specify_cli import preset_add
class FakeResponse(io.BytesIO):
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def geturl(self):
return "http://example.com/preset.zip"
monkeypatch.setattr("specify_cli._require_specify_project", lambda: project_dir)
monkeypatch.setattr("specify_cli.get_speckit_version", lambda: "0.6.0")
def fake_open_url(url, timeout=None, extra_headers=None, redirect_validator=None):
assert redirect_validator is not None
redirect_validator(url, "http://example.com/preset.zip")
return FakeResponse(b"zip")
monkeypatch.setattr("specify_cli.authentication.http.open_url", fake_open_url)
installed = False
def fake_install_from_zip(self, zip_path, speckit_version, priority=10):
nonlocal installed
installed = True
monkeypatch.setattr(PresetManager, "install_from_zip", fake_install_from_zip)
with pytest.raises(typer.Exit) as exc_info:
preset_add(preset_id=None, from_url="https://example.com/preset.zip", dev=None, priority=10)
assert exc_info.value.exit_code == 1
assert installed is False
def test_preset_add_from_url_rejects_hostless_https_url(self, project_dir):
"""URL installs reject HTTPS URLs without a hostname before downloading."""
from typer.testing import CliRunner
from unittest.mock import patch
from specify_cli import app
runner = CliRunner()
with patch.object(Path, "cwd", return_value=project_dir), \
patch("specify_cli.authentication.http.open_url") as open_url:
result = runner.invoke(app, ["preset", "add", "--from", "https:///preset.zip"])
assert result.exit_code == 1
output = strip_ansi(result.output)
assert "URL must use HTTPS with a hostname" in output
assert "got https://" not in output
open_url.assert_not_called()
def test_preset_add_from_url_redirect_error_describes_disallowed_url(self, project_dir, monkeypatch, capsys):
"""Redirect rejection message covers hostless HTTPS, not only non-HTTPS URLs."""
import typer
from specify_cli import preset_add
class FakeResponse(io.BytesIO):
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def geturl(self):
return "https:///preset.zip"
monkeypatch.setattr("specify_cli._require_specify_project", lambda: project_dir)
monkeypatch.setattr("specify_cli.get_speckit_version", lambda: "0.6.0")
monkeypatch.setattr(
"specify_cli.authentication.http.open_url",
lambda url, timeout=None, extra_headers=None, redirect_validator=None: FakeResponse(b"zip"),
)
monkeypatch.setattr(PresetManager, "install_from_zip", lambda *args, **kwargs: None)
with pytest.raises(typer.Exit) as exc_info:
preset_add(preset_id=None, from_url="https://example.com/preset.zip", dev=None, priority=10)
assert exc_info.value.exit_code == 1
output = strip_ansi(capsys.readouterr().out)
assert "redirected to a disallowed URL" in output
assert "must use HTTPS with a hostname" in output
def test_preset_add_from_url_streams_download_to_zip(self, project_dir, monkeypatch):
"""URL installs stream response bytes to disk before installing the ZIP."""
from specify_cli import preset_add
class FakeResponse(io.BytesIO):
def __init__(self, data):
super().__init__(data)
self.read_sizes = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def geturl(self):
return "https://example.com/preset.zip"
def read(self, size=-1):
assert size not in (-1, None)
self.read_sizes.append(size)
return super().read(size)
response = FakeResponse(b"zip-bytes")
installed = {}
def fake_install_from_zip(self, zip_path, speckit_version, priority=10):
installed["zip_bytes"] = Path(zip_path).read_bytes()
installed["speckit_version"] = speckit_version
installed["priority"] = priority
return SimpleNamespace(name="Test Preset", version="1.0.0")
monkeypatch.setattr("specify_cli._require_specify_project", lambda: project_dir)
monkeypatch.setattr("specify_cli.get_speckit_version", lambda: "0.6.0")
monkeypatch.setattr(
"specify_cli.authentication.http.open_url",
lambda url, timeout=None, extra_headers=None, redirect_validator=None: response,
)
monkeypatch.setattr(PresetManager, "install_from_zip", fake_install_from_zip)
preset_add(preset_id=None, from_url="https://example.com/preset.zip", dev=None, priority=7)
assert response.read_sizes
assert installed == {
"zip_bytes": b"zip-bytes",
"speckit_version": "0.6.0",
"priority": 7,
}
def test_bundled_preset_in_catalog(self):
"""Verify the lean preset is listed in catalog.json with bundled marker."""
catalog_path = Path(__file__).parent.parent / "presets" / "catalog.json"
@@ -4346,7 +4483,7 @@ class TestPresetAddFromUrlResolution:
def __exit__(self, *a):
return False
def fake_open_url(url, timeout=None, extra_headers=None):
def fake_open_url(url, timeout=None, extra_headers=None, redirect_validator=None):
captured_urls.append((url, extra_headers))
if "releases/tags/" in url:
return FakeResponse(json.dumps({
@@ -4404,7 +4541,7 @@ class TestPresetAddFromUrlResolution:
def __exit__(self, *a):
return False
def fake_open_url(url, timeout=None, extra_headers=None):
def fake_open_url(url, timeout=None, extra_headers=None, redirect_validator=None):
captured_urls.append((url, extra_headers))
return FakeResponse(zip_bytes)