Files
Xonotic-Exporter/xonotic_exporter/geoip.py
T
DerGrumpf 76adff71d7 Init
2026-04-26 16:56:13 +00:00

155 lines
4.7 KiB
Python

"""
GeoIP resolution via ip-api.com (free, no key required).
- Batch endpoint: up to 100 IPs per POST request
- Rate limit: 15 requests/min on the free tier (= 1500 IPs/min, plenty)
- Cache TTL: 24 hours (geo data for an IP never changes meaningfully)
- Private/loopback IPs are excluded entirely
"""
from __future__ import annotations
import asyncio
import ipaddress
import json
import logging
import time
from typing import Optional
from urllib.request import urlopen, Request
log = logging.getLogger(__name__)
TTL_SECONDS = 86400 # 24 hours
# fields we request from ip-api.com
_FIELDS = "status,query,countryCode,city,lat,lon"
_BATCH_URL = f"http://ip-api.com/batch?fields={_FIELDS}"
_BATCH_SIZE = 100 # ip-api.com hard limit
def _is_private(ip_str: str) -> bool:
"""Return True for loopback, private, link-local, or unspecified addresses."""
try:
addr = ipaddress.ip_address(ip_str)
return (
addr.is_private
or addr.is_loopback
or addr.is_link_local
or addr.is_unspecified
or addr.is_reserved
)
except ValueError:
return True # unparseable → skip
class GeoIPCache:
"""
Thread-safe async GeoIP cache backed by ip-api.com batch endpoint.
Usage:
cache = GeoIPCache()
results = await cache.lookup(["1.2.3.4", "5.6.7.8"])
"""
def __init__(self) -> None:
# ip -> (timestamp, GeoResult)
self._cache: dict[str, tuple[float, GeoResult]] = {}
self._lock = asyncio.Lock()
async def lookup(self, ips: list[str]) -> dict[str, "GeoResult"]:
"""
Resolve a list of IPs. Returns a dict of ip -> GeoResult.
Private IPs are excluded from the result entirely.
Cache hits never trigger a network call.
"""
public_ips = [ip for ip in ips if not _is_private(ip)]
if not public_ips:
return {}
now = time.monotonic()
results: dict[str, GeoResult] = {}
missing: list[str] = []
async with self._lock:
for ip in public_ips:
entry = self._cache.get(ip)
if entry and (now - entry[0]) < TTL_SECONDS:
results[ip] = entry[1]
else:
missing.append(ip)
if missing:
log.debug("GeoIP cache miss for %d IP(s): %s", len(missing), missing)
fetched = await _fetch_batch(missing)
async with self._lock:
for ip, geo in fetched.items():
self._cache[ip] = (time.monotonic(), geo)
results[ip] = geo
return results
def cache_size(self) -> int:
return len(self._cache)
class GeoResult:
__slots__ = ("ip", "country", "city", "lat", "lon")
def __init__(self, ip: str, country: str, city: str, lat: float, lon: float) -> None:
self.ip = ip
self.country = country
self.city = city
self.lat = lat
self.lon = lon
def __repr__(self) -> str:
return f"GeoResult({self.ip}{self.city}, {self.country} [{self.lat},{self.lon}])"
async def _fetch_batch(ips: list[str]) -> dict[str, GeoResult]:
"""
POST up to 100 IPs to ip-api.com/batch and return parsed results.
Splits into multiple requests if needed (shouldn't happen in practice
for a game server, but correct to handle it).
"""
results: dict[str, GeoResult] = {}
loop = asyncio.get_running_loop()
for chunk_start in range(0, len(ips), _BATCH_SIZE):
chunk = ips[chunk_start:chunk_start + _BATCH_SIZE]
try:
geo_map = await loop.run_in_executor(None, _post_batch, chunk)
results.update(geo_map)
except Exception as exc:
log.warning("GeoIP batch fetch failed: %s", exc)
return results
def _post_batch(ips: list[str]) -> dict[str, GeoResult]:
"""Synchronous HTTP POST — runs in executor thread."""
payload = json.dumps(ips).encode()
req = Request(
_BATCH_URL,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
results: dict[str, GeoResult] = {}
for entry in data:
if entry.get("status") != "success":
log.debug("GeoIP failed for %s: %s", entry.get("query"), entry.get("message"))
continue
ip = entry["query"]
results[ip] = GeoResult(
ip=ip,
country=entry.get("countryCode", ""),
city=entry.get("city", ""),
lat=float(entry.get("lat", 0.0)),
lon=float(entry.get("lon", 0.0)),
)
return results