Files
github-spec-kit/src/specify_cli/authentication/http.py
darion-yaphet 36ad3cde1b fix(presets): harden preset URL installs against unsafe redirects (#2911)
* Harden preset URL installs against unsafe redirects

Preset URL installs already rejected non-HTTPS source URLs, but the authenticated opener follows redirects. Validate the final response URL before writing the ZIP, preserve GitHub release asset URL resolution after the preset command module split, stream the response to disk, and keep catalog config serialization on safe YAML output.

Constraint: open_url follows redirects, so source URL validation alone does not constrain the downloaded target

Rejected: Keep response.read() for simplicity | large preset downloads should not be buffered entirely in memory

Confidence: high

Scope-risk: narrow

Directive: Keep preset URL policy aligned with workflow installer redirect validation

Tested: uvx ruff check src/specify_cli/__init__.py src/specify_cli/presets/__init__.py src/specify_cli/presets/_commands.py tests/test_presets.py

Tested: uv run pytest tests/test_presets.py -q

Not-tested: Real network redirect integration against a live HTTP server

Co-authored-by: OmX <omx@oh-my-codex.dev>

* Reject malformed preset download URLs

Preset downloads should fail early when a URL lacks a hostname, even if the scheme is HTTPS. The redirect error now describes any disallowed target instead of implying that only non-HTTPS redirects are blocked.

* Prevent credentialed preset redirects from downgrading transport

Preset URL downloads already checked the final URL after urllib followed redirects, but that was too late for authenticated requests because same-host redirects could preserve Authorization during the redirect itself. The authenticated HTTP helper now supports an opt-in redirect validator, and preset downloads use it to reject disallowed redirect targets before following them. The redirect auth handlers also stop preserving credentials across HTTPS to non-HTTPS downgrades as defense in depth.

* test(presets): 修复 URL 解析测试 mock 缺少 redirect_validator 参数

重定向安全加固为 open_url 新增 redirect_validator 参数,
两处 fake_open_url mock 签名未同步导致 TypeError。
补齐参数后全部 3717 个测试通过。

---------

Co-authored-by: OmX <omx@oh-my-codex.dev>
2026-06-11 07:21:50 -05:00

175 lines
6.6 KiB
Python

"""Authenticated HTTP helpers driven by ``~/.specify/auth.json``.
No credentials are sent unless the user has created ``auth.json``.
For each outbound URL the helper matches the hostname against
configured entries, resolves the token via the appropriate provider
class, and attaches auth headers. Redirect safety is enforced:
the ``Authorization`` header is stripped when a redirect leaves the
entry's declared hosts. On 401/403 the next matching entry is tried,
then unauthenticated.
"""
from __future__ import annotations
import urllib.error
import urllib.request
from fnmatch import fnmatch
from typing import Callable
from urllib.parse import urlparse
from . import get_provider
from .config import AuthConfigEntry, _default_config_path, find_entries_for_url, load_auth_config
_config_override: list[AuthConfigEntry] | None = None
_config_cache: list[AuthConfigEntry] | None = None # None = not yet loaded
def _load_config() -> list[AuthConfigEntry]:
"""Load auth config, using override if set (for testing).
The result is cached per-process so ``auth.json`` is read at most once,
and any warning about a malformed file fires only once.
"""
global _config_cache
if _config_override is not None:
return _config_override
if _config_cache is not None:
return _config_cache
try:
_config_cache = load_auth_config()
except (ValueError, OSError) as exc:
import warnings
config_path = _default_config_path()
warnings.warn(
f"Failed to load {config_path}: {exc}. "
"All requests will be unauthenticated.",
UserWarning,
stacklevel=2,
)
_config_cache = []
return _config_cache
def _hostname_in_hosts(hostname: str, hosts: tuple[str, ...]) -> bool:
"""Return True if *hostname* matches any pattern in *hosts*."""
hostname = hostname.lower()
return any(p == hostname or fnmatch(hostname, p) for p in hosts)
RedirectValidator = Callable[[str, str], None]
class _StripAuthOnRedirect(urllib.request.HTTPRedirectHandler):
"""Drop ``Authorization`` when a redirect leaves trusted hosts or downgrades."""
def __init__(
self,
hosts: tuple[str, ...],
redirect_validator: RedirectValidator | None = None,
) -> None:
super().__init__()
self._hosts = hosts
self._redirect_validator = redirect_validator
def redirect_request(self, req, fp, code, msg, headers, newurl):
if self._redirect_validator is not None:
self._redirect_validator(req.full_url, newurl)
original_auth = (
req.get_header("Authorization")
or req.unredirected_hdrs.get("Authorization")
)
new_req = super().redirect_request(req, fp, code, msg, headers, newurl)
if new_req is not None:
old_scheme = urlparse(req.full_url).scheme
new_parsed = urlparse(newurl)
hostname = (new_parsed.hostname or "").lower()
is_https_downgrade = old_scheme == "https" and new_parsed.scheme != "https"
if _hostname_in_hosts(hostname, self._hosts) and not is_https_downgrade:
if original_auth:
new_req.add_unredirected_header("Authorization", original_auth)
else:
new_req.headers.pop("Authorization", None)
new_req.unredirected_hdrs.pop("Authorization", None)
return new_req
def build_request(url: str, extra_headers: dict[str, str] | None = None) -> urllib.request.Request:
"""Build a :class:`~urllib.request.Request`, attaching auth when config matches.
Uses the first matching entry from ``auth.json`` whose token resolves.
Returns a plain request when no entry matches or the file doesn't exist.
"""
headers: dict[str, str] = {}
if extra_headers:
# Strip Authorization from extra_headers to prevent bypass
headers.update({k: v for k, v in extra_headers.items() if k.lower() != "authorization"})
# Auth headers applied last — cannot be overridden by extra_headers
entries = find_entries_for_url(url, _load_config())
for entry in entries:
provider = get_provider(entry.provider)
if provider is None:
continue
token = provider.resolve_token(entry)
if token:
headers.update(provider.auth_headers(token, entry.auth))
break
return urllib.request.Request(url, headers=headers)
def open_url(
url: str,
timeout: int = 10,
extra_headers: dict[str, str] | None = None,
redirect_validator: RedirectValidator | None = None,
):
"""Open *url* with config-driven auth, redirect stripping, and fallthrough.
1. Find ``auth.json`` entries whose hosts match the URL.
2. For each entry, resolve the token and try the request.
3. On 401/403 move to the next matching entry.
4. After all entries exhausted (or none matched), try unauthenticated.
5. Non-auth errors (404, 500, network) raise immediately.
*extra_headers* (e.g. ``Accept``) are merged into every attempt.
*redirect_validator*, when provided, is called with ``(old_url, new_url)``
before following each redirect and may raise to reject the redirect.
"""
entries = find_entries_for_url(url, _load_config())
def _make_req(auth_headers: dict[str, str]) -> urllib.request.Request:
merged = {}
if extra_headers:
# Strip Authorization from extra_headers to prevent bypass
merged.update({k: v for k, v in extra_headers.items() if k.lower() != "authorization"})
# Auth headers applied last — cannot be overridden by extra_headers
merged.update(auth_headers)
return urllib.request.Request(url, headers=merged)
# Try each matching entry
for entry in entries:
provider = get_provider(entry.provider)
if provider is None:
continue
token = provider.resolve_token(entry)
if not token:
continue
req = _make_req(provider.auth_headers(token, entry.auth))
opener = urllib.request.build_opener(_StripAuthOnRedirect(entry.hosts, redirect_validator))
try:
return opener.open(req, timeout=timeout)
except urllib.error.HTTPError as exc:
if exc.code in (401, 403):
exc.close()
continue # try next entry
raise
# No entry worked (or none matched) — unauthenticated fallback
req = _make_req({})
if redirect_validator is not None:
opener = urllib.request.build_opener(_StripAuthOnRedirect((), redirect_validator))
return opener.open(req, timeout=timeout)
return urllib.request.urlopen(req, timeout=timeout) # noqa: S310