Files
github-spec-kit/src/specify_cli/authentication/http.py
Si Zengyu 1add20341d fix(extensions,presets,workflows): resolve private GHES release assets via /api/v3 (#3157)
* feat(auth): add github_provider_hosts() to enumerate GHES hosts from auth.json

Assisted-by: Claude Code (model: claude-sonnet-4-6, autonomous)

* fix(extensions): resolve GHES release assets via /api/v3

Generalizes resolve_github_release_asset_api_url to GitHub Enterprise
Server hosts (gated by auth.json github hosts), fixing private GHES
extension/preset downloads. github/spec-kit#3147

Assisted-by: Claude Code (model: claude-sonnet-4-6, autonomous)

* fix(extensions,presets): pass auth.json github hosts into release resolver

Assisted-by: Claude Code (model: claude-sonnet-4-6, autonomous)

* docs(auth): document GHES private catalog + release-asset auth

Assisted-by: Claude Code (model: claude-sonnet-4-6, autonomous)

* fix(presets,workflows): pass auth.json github hosts into remaining release resolvers

Wires preset add --from and workflow add through github_provider_hosts()
so private GHES release assets resolve via /api/v3 there too. github/spec-kit#3147

Assisted-by: Claude Code (model: claude-sonnet-4-6, autonomous)

* test(presets): use module-level io.BytesIO in GHES preset test

Addresses Copilot review on PR #3157: drop unnecessary __import__("io")
in test_preset_add_from_ghes_release_url_resolves_via_api_v3 since io is
already imported at module level.

* fix(github-http): pass through GHES asset API URLs by path shape

Addresses Copilot review on PR #3157. A direct GHES /api/v3 release asset
URL was only returned as already-resolved when its host was in the
allowlist; otherwise the resolver returned None and the caller downloaded
the same URL without 'Accept: application/octet-stream', fetching JSON
metadata instead of the binary.

Gate the passthrough on path shape alone, mirroring the github.com case.
This is safe: passthrough returns the input URL unchanged and the caller
fetches it either way, so no new request to an arbitrary host is induced;
the token stays independently gated by auth.json in open_url. The
allowlist remains the anti-SSRF gate on the tag-lookup resolving path.

Add test_passthrough_for_unlisted_ghes_api_asset_url.
2026-06-25 10:44:30 -05:00

189 lines
7.1 KiB
Python

"""Authenticated HTTP helpers driven by ``~/.specify/auth.json``.
No credentials are sent unless the user has created ``auth.json``.
For each outbound URL the helper matches the hostname against
configured entries, resolves the token via the appropriate provider
class, and attaches auth headers. Redirect safety is enforced:
the ``Authorization`` header is stripped when a redirect leaves the
entry's declared hosts. On 401/403 the next matching entry is tried,
then unauthenticated.
"""
from __future__ import annotations
import urllib.error
import urllib.request
from fnmatch import fnmatch
from typing import Callable
from urllib.parse import urlparse
from . import get_provider
from .config import AuthConfigEntry, _default_config_path, find_entries_for_url, load_auth_config
_config_override: list[AuthConfigEntry] | None = None
_config_cache: list[AuthConfigEntry] | None = None # None = not yet loaded
def _load_config() -> list[AuthConfigEntry]:
"""Load auth config, using override if set (for testing).
The result is cached per-process so ``auth.json`` is read at most once,
and any warning about a malformed file fires only once.
"""
global _config_cache
if _config_override is not None:
return _config_override
if _config_cache is not None:
return _config_cache
try:
_config_cache = load_auth_config()
except (ValueError, OSError) as exc:
import warnings
config_path = _default_config_path()
warnings.warn(
f"Failed to load {config_path}: {exc}. "
"All requests will be unauthenticated.",
UserWarning,
stacklevel=2,
)
_config_cache = []
return _config_cache
def _hostname_in_hosts(hostname: str, hosts: tuple[str, ...]) -> bool:
"""Return True if *hostname* matches any pattern in *hosts*."""
hostname = hostname.lower()
return any(p == hostname or fnmatch(hostname, p) for p in hosts)
RedirectValidator = Callable[[str, str], None]
class _StripAuthOnRedirect(urllib.request.HTTPRedirectHandler):
"""Drop ``Authorization`` when a redirect leaves trusted hosts or downgrades."""
def __init__(
self,
hosts: tuple[str, ...],
redirect_validator: RedirectValidator | None = None,
) -> None:
super().__init__()
self._hosts = hosts
self._redirect_validator = redirect_validator
def redirect_request(self, req, fp, code, msg, headers, newurl):
if self._redirect_validator is not None:
self._redirect_validator(req.full_url, newurl)
original_auth = (
req.get_header("Authorization")
or req.unredirected_hdrs.get("Authorization")
)
new_req = super().redirect_request(req, fp, code, msg, headers, newurl)
if new_req is not None:
old_scheme = urlparse(req.full_url).scheme
new_parsed = urlparse(newurl)
hostname = (new_parsed.hostname or "").lower()
is_https_downgrade = old_scheme == "https" and new_parsed.scheme != "https"
if _hostname_in_hosts(hostname, self._hosts) and not is_https_downgrade:
if original_auth:
new_req.add_unredirected_header("Authorization", original_auth)
else:
new_req.headers.pop("Authorization", None)
new_req.unredirected_hdrs.pop("Authorization", None)
return new_req
def build_request(url: str, extra_headers: dict[str, str] | None = None) -> urllib.request.Request:
"""Build a :class:`~urllib.request.Request`, attaching auth when config matches.
Uses the first matching entry from ``auth.json`` whose token resolves.
Returns a plain request when no entry matches or the file doesn't exist.
"""
headers: dict[str, str] = {}
if extra_headers:
# Strip Authorization from extra_headers to prevent bypass
headers.update({k: v for k, v in extra_headers.items() if k.lower() != "authorization"})
# Auth headers applied last — cannot be overridden by extra_headers
entries = find_entries_for_url(url, _load_config())
for entry in entries:
provider = get_provider(entry.provider)
if provider is None:
continue
token = provider.resolve_token(entry)
if token:
headers.update(provider.auth_headers(token, entry.auth))
break
return urllib.request.Request(url, headers=headers)
def github_provider_hosts() -> tuple[str, ...]:
"""Return host patterns from every ``github`` provider entry in ``auth.json``.
Used to classify which hosts are GitHub Enterprise Server instances when
resolving release-asset download URLs. Returns an empty tuple when no
``auth.json`` exists or it contains no ``github`` entries.
"""
hosts: list[str] = []
for entry in _load_config():
if entry.provider == "github":
hosts.extend(entry.hosts)
return tuple(hosts)
def open_url(
url: str,
timeout: int = 10,
extra_headers: dict[str, str] | None = None,
redirect_validator: RedirectValidator | None = None,
):
"""Open *url* with config-driven auth, redirect stripping, and fallthrough.
1. Find ``auth.json`` entries whose hosts match the URL.
2. For each entry, resolve the token and try the request.
3. On 401/403 move to the next matching entry.
4. After all entries exhausted (or none matched), try unauthenticated.
5. Non-auth errors (404, 500, network) raise immediately.
*extra_headers* (e.g. ``Accept``) are merged into every attempt.
*redirect_validator*, when provided, is called with ``(old_url, new_url)``
before following each redirect and may raise to reject the redirect.
"""
entries = find_entries_for_url(url, _load_config())
def _make_req(auth_headers: dict[str, str]) -> urllib.request.Request:
merged = {}
if extra_headers:
# Strip Authorization from extra_headers to prevent bypass
merged.update({k: v for k, v in extra_headers.items() if k.lower() != "authorization"})
# Auth headers applied last — cannot be overridden by extra_headers
merged.update(auth_headers)
return urllib.request.Request(url, headers=merged)
# Try each matching entry
for entry in entries:
provider = get_provider(entry.provider)
if provider is None:
continue
token = provider.resolve_token(entry)
if not token:
continue
req = _make_req(provider.auth_headers(token, entry.auth))
opener = urllib.request.build_opener(_StripAuthOnRedirect(entry.hosts, redirect_validator))
try:
return opener.open(req, timeout=timeout)
except urllib.error.HTTPError as exc:
if exc.code in (401, 403):
exc.close()
continue # try next entry
raise
# No entry worked (or none matched) — unauthenticated fallback
req = _make_req({})
if redirect_validator is not None:
opener = urllib.request.build_opener(_StripAuthOnRedirect((), redirect_validator))
return opener.open(req, timeout=timeout)
return urllib.request.urlopen(req, timeout=timeout) # noqa: S310