nanobot/nanobot/security/network.py
chengyongru 288146315e fix(security): normalize IPv6-mapped IPv4 in loopback check, add tests
- Apply _normalize_addr in _is_allowed_loopback_target so
  ::ffff:127.0.0.1 is correctly identified as loopback
- Add test for contains_internal_url with IPv6-mapped addresses
- Add test for whitelist + IPv6-mapped CGNAT interaction
2026-05-30 15:34:49 +08:00

160 lines
5.4 KiB
Python

"""Network security utilities — SSRF protection and internal URL detection."""
from __future__ import annotations
import ipaddress
import re
import socket
from contextlib import suppress
from urllib.parse import urlparse
_BLOCKED_NETWORKS = [
ipaddress.ip_network("0.0.0.0/8"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("100.64.0.0/10"), # carrier-grade NAT
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"), # link-local / cloud metadata
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"), # unique local
ipaddress.ip_network("fe80::/10"), # link-local v6
]
_URL_RE = re.compile(r"https?://[^\s\"'`;|<>]+", re.IGNORECASE)
_allowed_networks: list[ipaddress.IPv4Network | ipaddress.IPv6Network] = []
def configure_ssrf_whitelist(cidrs: list[str]) -> None:
"""Allow specific CIDR ranges to bypass SSRF blocking (e.g. Tailscale's 100.64.0.0/10)."""
global _allowed_networks
nets = []
for cidr in cidrs:
with suppress(ValueError):
nets.append(ipaddress.ip_network(cidr, strict=False))
_allowed_networks = nets
def _normalize_addr(
addr: ipaddress.IPv4Address | ipaddress.IPv6Address,
) -> ipaddress.IPv4Address | ipaddress.IPv6Address:
"""Normalize IPv6-mapped IPv4 addresses to their IPv4 form.
``::ffff:127.0.0.1`` is semantically identical to ``127.0.0.1`` but
Python's ipaddress treats it as an IPv6Address that matches neither
``127.0.0.0/8`` nor ``::1/128``. Converting it to IPv4 ensures
blocklist/allowlist checks work correctly.
"""
if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped is not None:
return addr.ipv4_mapped
return addr
def _is_private(addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool:
normalized = _normalize_addr(addr)
if _allowed_networks and any(normalized in net for net in _allowed_networks):
return False
return any(normalized in net for net in _BLOCKED_NETWORKS)
def validate_url_target(url: str, *, allow_loopback: bool = False) -> tuple[bool, str]:
"""Validate a URL is safe to fetch: scheme, hostname, and resolved IPs.
``allow_loopback`` is intentionally narrow: it only permits literal
loopback hosts (localhost, 127.0.0.0/8, ::1) when every resolved address is
loopback. It does not allow RFC1918, link-local, metadata, or public DNS
names that happen to resolve to loopback.
Returns (ok, error_message). When ok is True, error_message is empty.
"""
try:
p = urlparse(url)
except Exception as e:
return False, str(e)
if p.scheme not in ("http", "https"):
return False, f"Only http/https allowed, got '{p.scheme or 'none'}'"
if not p.netloc:
return False, "Missing domain"
hostname = p.hostname
if not hostname:
return False, "Missing hostname"
try:
infos = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
except socket.gaierror:
return False, f"Cannot resolve hostname: {hostname}"
addrs: list[ipaddress.IPv4Address | ipaddress.IPv6Address] = []
for info in infos:
try:
addr = ipaddress.ip_address(info[4][0])
except ValueError:
continue
addrs.append(addr)
if allow_loopback and _is_allowed_loopback_target(hostname, addrs):
return True, ""
for addr in addrs:
if _is_private(addr):
return False, f"Blocked: {hostname} resolves to private/internal address {addr}"
return True, ""
def validate_resolved_url(url: str) -> tuple[bool, str]:
"""Validate an already-fetched URL (e.g. after redirect). Only checks the IP, skips DNS."""
try:
p = urlparse(url)
except Exception:
return True, ""
hostname = p.hostname
if not hostname:
return True, ""
try:
addr = ipaddress.ip_address(hostname)
if _is_private(addr):
return False, f"Redirect target is a private address: {addr}"
except ValueError:
# hostname is a domain name, resolve it
try:
infos = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
except socket.gaierror:
return True, ""
for info in infos:
try:
addr = ipaddress.ip_address(info[4][0])
except ValueError:
continue
if _is_private(addr):
return False, f"Redirect target {hostname} resolves to private address {addr}"
return True, ""
def contains_internal_url(command: str, *, allow_loopback: bool = False) -> bool:
"""Return True if the command string contains a URL targeting an internal/private address."""
for m in _URL_RE.finditer(command):
url = m.group(0)
ok, _ = validate_url_target(url, allow_loopback=allow_loopback)
if not ok:
return True
return False
def _is_allowed_loopback_target(
hostname: str,
addrs: list[ipaddress.IPv4Address | ipaddress.IPv6Address],
) -> bool:
if not addrs or not all(_normalize_addr(addr).is_loopback for addr in addrs):
return False
normalized = hostname.rstrip(".").lower()
if normalized == "localhost":
return True
with suppress(ValueError):
return ipaddress.ip_address(hostname).is_loopback
return False