""" GeoIP resolution via ip-api.com (free, no key required). - Batch endpoint: up to 100 IPs per POST request - Rate limit: 15 requests/min on the free tier (= 1500 IPs/min, plenty) - Cache TTL: 24 hours (geo data for an IP never changes meaningfully) - Private/loopback IPs are excluded entirely """ from __future__ import annotations import asyncio import ipaddress import json import logging import time from typing import Optional from urllib.request import urlopen, Request log = logging.getLogger(__name__) TTL_SECONDS = 86400 # 24 hours # fields we request from ip-api.com _FIELDS = "status,query,countryCode,city,lat,lon" _BATCH_URL = f"http://ip-api.com/batch?fields={_FIELDS}" _BATCH_SIZE = 100 # ip-api.com hard limit def _is_private(ip_str: str) -> bool: """Return True for loopback, private, link-local, or unspecified addresses.""" try: addr = ipaddress.ip_address(ip_str) return ( addr.is_private or addr.is_loopback or addr.is_link_local or addr.is_unspecified or addr.is_reserved ) except ValueError: return True # unparseable → skip class GeoIPCache: """ Thread-safe async GeoIP cache backed by ip-api.com batch endpoint. Usage: cache = GeoIPCache() results = await cache.lookup(["1.2.3.4", "5.6.7.8"]) """ def __init__(self) -> None: # ip -> (timestamp, GeoResult) self._cache: dict[str, tuple[float, GeoResult]] = {} self._lock = asyncio.Lock() async def lookup(self, ips: list[str]) -> dict[str, "GeoResult"]: """ Resolve a list of IPs. Returns a dict of ip -> GeoResult. Private IPs are excluded from the result entirely. Cache hits never trigger a network call. """ public_ips = [ip for ip in ips if not _is_private(ip)] if not public_ips: return {} now = time.monotonic() results: dict[str, GeoResult] = {} missing: list[str] = [] async with self._lock: for ip in public_ips: entry = self._cache.get(ip) if entry and (now - entry[0]) < TTL_SECONDS: results[ip] = entry[1] else: missing.append(ip) if missing: log.debug("GeoIP cache miss for %d IP(s): %s", len(missing), missing) fetched = await _fetch_batch(missing) async with self._lock: for ip, geo in fetched.items(): self._cache[ip] = (time.monotonic(), geo) results[ip] = geo return results def cache_size(self) -> int: return len(self._cache) class GeoResult: __slots__ = ("ip", "country", "city", "lat", "lon") def __init__(self, ip: str, country: str, city: str, lat: float, lon: float) -> None: self.ip = ip self.country = country self.city = city self.lat = lat self.lon = lon def __repr__(self) -> str: return f"GeoResult({self.ip} → {self.city}, {self.country} [{self.lat},{self.lon}])" async def _fetch_batch(ips: list[str]) -> dict[str, GeoResult]: """ POST up to 100 IPs to ip-api.com/batch and return parsed results. Splits into multiple requests if needed (shouldn't happen in practice for a game server, but correct to handle it). """ results: dict[str, GeoResult] = {} loop = asyncio.get_running_loop() for chunk_start in range(0, len(ips), _BATCH_SIZE): chunk = ips[chunk_start:chunk_start + _BATCH_SIZE] try: geo_map = await loop.run_in_executor(None, _post_batch, chunk) results.update(geo_map) except Exception as exc: log.warning("GeoIP batch fetch failed: %s", exc) return results def _post_batch(ips: list[str]) -> dict[str, GeoResult]: """Synchronous HTTP POST — runs in executor thread.""" payload = json.dumps(ips).encode() req = Request( _BATCH_URL, data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urlopen(req, timeout=10) as resp: data = json.loads(resp.read()) results: dict[str, GeoResult] = {} for entry in data: if entry.get("status") != "success": log.debug("GeoIP failed for %s: %s", entry.get("query"), entry.get("message")) continue ip = entry["query"] results[ip] = GeoResult( ip=ip, country=entry.get("countryCode", ""), city=entry.get("city", ""), lat=float(entry.get("lat", 0.0)), lon=float(entry.get("lon", 0.0)), ) return results