155 lines
4.7 KiB
Python
155 lines
4.7 KiB
Python
"""
|
|
GeoIP resolution via ip-api.com (free, no key required).
|
|
|
|
- Batch endpoint: up to 100 IPs per POST request
|
|
- Rate limit: 15 requests/min on the free tier (= 1500 IPs/min, plenty)
|
|
- Cache TTL: 24 hours (geo data for an IP never changes meaningfully)
|
|
- Private/loopback IPs are excluded entirely
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import ipaddress
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Optional
|
|
from urllib.request import urlopen, Request
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
TTL_SECONDS = 86400 # 24 hours
|
|
|
|
# fields we request from ip-api.com
|
|
_FIELDS = "status,query,countryCode,city,lat,lon"
|
|
_BATCH_URL = f"http://ip-api.com/batch?fields={_FIELDS}"
|
|
_BATCH_SIZE = 100 # ip-api.com hard limit
|
|
|
|
|
|
def _is_private(ip_str: str) -> bool:
|
|
"""Return True for loopback, private, link-local, or unspecified addresses."""
|
|
try:
|
|
addr = ipaddress.ip_address(ip_str)
|
|
return (
|
|
addr.is_private
|
|
or addr.is_loopback
|
|
or addr.is_link_local
|
|
or addr.is_unspecified
|
|
or addr.is_reserved
|
|
)
|
|
except ValueError:
|
|
return True # unparseable → skip
|
|
|
|
|
|
class GeoIPCache:
|
|
"""
|
|
Thread-safe async GeoIP cache backed by ip-api.com batch endpoint.
|
|
|
|
Usage:
|
|
cache = GeoIPCache()
|
|
results = await cache.lookup(["1.2.3.4", "5.6.7.8"])
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
# ip -> (timestamp, GeoResult)
|
|
self._cache: dict[str, tuple[float, GeoResult]] = {}
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def lookup(self, ips: list[str]) -> dict[str, "GeoResult"]:
|
|
"""
|
|
Resolve a list of IPs. Returns a dict of ip -> GeoResult.
|
|
Private IPs are excluded from the result entirely.
|
|
Cache hits never trigger a network call.
|
|
"""
|
|
public_ips = [ip for ip in ips if not _is_private(ip)]
|
|
if not public_ips:
|
|
return {}
|
|
|
|
now = time.monotonic()
|
|
results: dict[str, GeoResult] = {}
|
|
missing: list[str] = []
|
|
|
|
async with self._lock:
|
|
for ip in public_ips:
|
|
entry = self._cache.get(ip)
|
|
if entry and (now - entry[0]) < TTL_SECONDS:
|
|
results[ip] = entry[1]
|
|
else:
|
|
missing.append(ip)
|
|
|
|
if missing:
|
|
log.debug("GeoIP cache miss for %d IP(s): %s", len(missing), missing)
|
|
fetched = await _fetch_batch(missing)
|
|
async with self._lock:
|
|
for ip, geo in fetched.items():
|
|
self._cache[ip] = (time.monotonic(), geo)
|
|
results[ip] = geo
|
|
|
|
return results
|
|
|
|
def cache_size(self) -> int:
|
|
return len(self._cache)
|
|
|
|
|
|
class GeoResult:
|
|
__slots__ = ("ip", "country", "city", "lat", "lon")
|
|
|
|
def __init__(self, ip: str, country: str, city: str, lat: float, lon: float) -> None:
|
|
self.ip = ip
|
|
self.country = country
|
|
self.city = city
|
|
self.lat = lat
|
|
self.lon = lon
|
|
|
|
def __repr__(self) -> str:
|
|
return f"GeoResult({self.ip} → {self.city}, {self.country} [{self.lat},{self.lon}])"
|
|
|
|
|
|
async def _fetch_batch(ips: list[str]) -> dict[str, GeoResult]:
|
|
"""
|
|
POST up to 100 IPs to ip-api.com/batch and return parsed results.
|
|
Splits into multiple requests if needed (shouldn't happen in practice
|
|
for a game server, but correct to handle it).
|
|
"""
|
|
results: dict[str, GeoResult] = {}
|
|
loop = asyncio.get_running_loop()
|
|
|
|
for chunk_start in range(0, len(ips), _BATCH_SIZE):
|
|
chunk = ips[chunk_start:chunk_start + _BATCH_SIZE]
|
|
try:
|
|
geo_map = await loop.run_in_executor(None, _post_batch, chunk)
|
|
results.update(geo_map)
|
|
except Exception as exc:
|
|
log.warning("GeoIP batch fetch failed: %s", exc)
|
|
|
|
return results
|
|
|
|
|
|
def _post_batch(ips: list[str]) -> dict[str, GeoResult]:
|
|
"""Synchronous HTTP POST — runs in executor thread."""
|
|
payload = json.dumps(ips).encode()
|
|
req = Request(
|
|
_BATCH_URL,
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urlopen(req, timeout=10) as resp:
|
|
data = json.loads(resp.read())
|
|
|
|
results: dict[str, GeoResult] = {}
|
|
for entry in data:
|
|
if entry.get("status") != "success":
|
|
log.debug("GeoIP failed for %s: %s", entry.get("query"), entry.get("message"))
|
|
continue
|
|
ip = entry["query"]
|
|
results[ip] = GeoResult(
|
|
ip=ip,
|
|
country=entry.get("countryCode", ""),
|
|
city=entry.get("city", ""),
|
|
lat=float(entry.get("lat", 0.0)),
|
|
lon=float(entry.get("lon", 0.0)),
|
|
)
|
|
return results
|