Files
DerGrumpf 76adff71d7 Init
2026-04-26 16:56:13 +00:00

344 lines
13 KiB
Python

"""
Minimal async HTTP server for the exporter.
Uses only the standard library (http.server + asyncio streams) — no aiohttp
dependency. Endpoints:
GET / → HTML index listing configured servers
GET /metrics?target=<n> → Prometheus exposition for server <n>
GET /geo?target=<n> → Prometheus player geolocation metrics for server <n>
GET /match?target=<n> → Current Match Data
POST /-/reload → Reload config from disk (SIGHUP also works)
GET /-/healthy → 200 OK liveness probe
"""
from __future__ import annotations
import asyncio
import logging
import signal
import sys
from typing import Optional
from urllib.parse import parse_qs, urlparse
from .config import ExporterConfig, load_config, ConfigError
from .geoip import GeoIPCache
from .prometheus import CONTENT_TYPE, build_registry, build_player_geo_registry, build_match_registry
from .rcon import RconError, scrape_server, scrape_match
log = logging.getLogger(__name__)
_INDEX_HTML = """\
<!DOCTYPE html>
<html lang="en">
<head><meta charset="utf-8"><title>Xonotic Exporter</title>
<style>body{{font-family:monospace;max-width:700px;margin:2rem auto}}
a{{color:#4fa}}li{{margin:.3rem 0}}</style>
</head>
<body>
<h1>Xonotic Prometheus Exporter</h1>
<p>Configured servers:</p>
<ul>
{items}
</ul>
<p><a href="/-/healthy">/-/healthy</a> &nbsp; <a href="/-/reload" id="rl">/-/reload (POST)</a></p>
</body></html>
"""
# ── Request / Response helpers ─────────────────────────────────────────────────
class _Request:
def __init__(self, method: str, path: str, query: dict, body: bytes) -> None:
self.method = method
self.path = path
self.query = query
self.body = body
def qparam(self, key: str) -> Optional[str]:
vals = self.query.get(key)
return vals[0] if vals else None
def _response(
writer: asyncio.StreamWriter,
status: int,
content_type: str,
body: bytes | str,
) -> None:
if isinstance(body, str):
body = body.encode()
status_text = {
200: "OK",
400: "Bad Request",
404: "Not Found",
500: "Internal Server Error",
}.get(status, "Unknown")
header = (
f"HTTP/1.1 {status} {status_text}\r\n"
f"Content-Type: {content_type}\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode()
writer.write(header + body)
# ── Main server class ──────────────────────────────────────────────────────────
class XonoticExporterServer:
def __init__(self, config: ExporterConfig, config_path: Optional[str] = None) -> None:
self._config = config
self._config_path = config_path
self._server: Optional[asyncio.Server] = None
# shared GeoIP cache — lives for the lifetime of the process
# so cached results survive across Prometheus scrapes
self._geo_cache = GeoIPCache()
# ── Public API ─────────────────────────────────────────────────────────────
async def start(self) -> None:
host = self._config.host
port = self._config.port
self._server = await asyncio.start_server(
self._handle_connection, host, port
)
log.info("Xonotic exporter listening on http://%s:%d", host, port)
async def serve_forever(self) -> None:
if self._server is None:
await self.start()
async with self._server:
await self._server.serve_forever()
def reload(self) -> bool:
"""Reload configuration from disk. Returns True on success."""
if not self._config_path:
log.warning("No config path set — cannot reload")
return False
try:
new_cfg = load_config(self._config_path)
self._config = new_cfg
log.info("Configuration reloaded from %s", self._config_path)
return True
except ConfigError as exc:
log.error("Config reload failed: %s", exc)
return False
# ── Connection handler ─────────────────────────────────────────────────────
async def _handle_connection(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
try:
req = await self._parse_request(reader)
if req is None:
writer.close()
return
await self._dispatch(req, writer)
await writer.drain()
except Exception as exc:
log.debug("Connection error: %s", exc)
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
async def _parse_request(self, reader: asyncio.StreamReader) -> Optional[_Request]:
try:
request_line = await asyncio.wait_for(reader.readline(), 10.0)
except asyncio.TimeoutError:
return None
if not request_line:
return None
parts = request_line.decode(errors="replace").strip().split()
if len(parts) < 2:
return None
method, raw_path = parts[0].upper(), parts[1]
# read headers
headers: dict[str, str] = {}
while True:
line = await reader.readline()
stripped = line.strip()
if not stripped:
break
if b":" in stripped:
k, _, v = stripped.partition(b":")
headers[k.strip().lower().decode()] = v.strip().decode()
# read body if present
content_length = int(headers.get("content-length", 0))
body = b""
if content_length > 0:
body = await reader.read(min(content_length, 8192))
parsed = urlparse(raw_path)
query = parse_qs(parsed.query)
return _Request(method, parsed.path, query, body)
# ── Router ─────────────────────────────────────────────────────────────────
async def _dispatch(self, req: _Request, writer: asyncio.StreamWriter) -> None:
if req.path == "/" and req.method == "GET":
self._handle_index(writer)
elif req.path == "/metrics" and req.method == "GET":
await self._handle_metrics(req, writer)
elif req.path == "/geo" and req.method == "GET":
await self._handle_geo(req, writer)
elif req.path == "/-/reload" and req.method == "POST":
self._handle_reload(writer)
elif req.path == "/-/healthy" and req.method == "GET":
_response(writer, 200, "text/plain", "OK\n")
elif req.path == "/match" and req.method == "GET":
await self._handle_match(req, writer)
else:
_response(writer, 404, "text/plain", "Not found\n")
# ── Handlers ───────────────────────────────────────────────────────────────
def _handle_index(self, writer: asyncio.StreamWriter) -> None:
names = self._config.server_names()
items = "\n".join(
f' <li>'
f'<a href="/metrics?target={n}">{n} metrics</a> &nbsp; '
f'<a href="/geo?target={n}">{n} geo</a>'
f'</li>'
for n in names
)
_response(writer, 200, "text/html; charset=utf-8", _INDEX_HTML.format(items=items))
async def _handle_metrics(self, req: _Request, writer: asyncio.StreamWriter) -> None:
target = req.qparam("target")
if not target:
_response(writer, 400, "text/plain", "'target' query parameter required\n")
return
server = self._config.get_server(target)
if server is None:
_response(writer, 400, "text/plain",
f"Unknown target: {target!r}. Known: {self._config.server_names()}\n")
return
try:
metrics = await scrape_server(server)
_, raw = build_registry(server.name, metrics, up=True)
except (RconError, OSError) as exc:
log.error("[%s] scrape failed: %s", target, exc)
_, raw = build_registry(server.name, None, up=False)
_response(writer, 200, CONTENT_TYPE, raw)
async def _handle_geo(self, req: _Request, writer: asyncio.StreamWriter) -> None:
target = req.qparam("target")
if not target:
_response(writer, 400, "text/plain", "'target' query parameter required\n")
return
server = self._config.get_server(target)
if server is None:
_response(writer, 400, "text/plain",
f"Unknown target: {target!r}. Known: {self._config.server_names()}\n")
return
try:
metrics = await scrape_server(server)
except (RconError, OSError) as exc:
log.error("[%s] scrape failed for geo endpoint: %s", target, exc)
_response(writer, 200, CONTENT_TYPE, b"")
return
# collect public IPs from non-bot players only
public_ips = [
p.ip for p in metrics.players
if not p.is_bot
]
# resolve via cache — only misses hit ip-api.com
geo_results = await self._geo_cache.lookup(public_ips)
log.debug(
"[%s] geo: %d players, %d public IPs, %d resolved, cache size %d",
target,
len(metrics.players),
len(public_ips),
len(geo_results),
self._geo_cache.cache_size(),
)
try:
raw = build_player_geo_registry(server.name, metrics.players, geo_results)
except Exception as exc:
log.error("[%s] geo registry build failed: %s", target, exc, exc_info=True)
_response(writer, 200, CONTENT_TYPE, b"")
return
_response(writer, 200, CONTENT_TYPE, raw)
def _handle_reload(self, writer: asyncio.StreamWriter) -> None:
ok = self.reload()
if ok:
_response(writer, 200, "text/plain", "Config reloaded\n")
else:
_response(writer, 500, "text/plain", "Config reload failed — check logs\n")
async def _handle_match(self, req: _Request, writer: asyncio.StreamWriter) -> None:
target = req.qparam("target")
if not target:
_response(writer, 400, "text/plain", "'target' query parameter required\n")
return
server = self._config.get_server(target)
if server is None:
_response(writer, 400, "text/plain",
f"Unknown target: {target!r}\n")
return
try:
metrics, match_meta = await asyncio.gather(
scrape_server(server),
scrape_match(server),
)
except (RconError, OSError) as exc:
log.error("[%s] match scrape failed: %s", target, exc)
_response(writer, 200, CONTENT_TYPE, b"")
return
try:
raw = build_match_registry(server.name, metrics, match_meta)
except Exception as exc:
log.error("[%s] match registry build failed: %s", target, exc, exc_info=True)
_response(writer, 200, CONTENT_TYPE, b"")
return
_response(writer, 200, CONTENT_TYPE, raw)
# ── Entrypoint used by cli.py ──────────────────────────────────────────────────
def run_server(config: ExporterConfig, config_path: Optional[str] = None) -> None:
"""Blocking call — runs until SIGINT/SIGTERM."""
server = XonoticExporterServer(config, config_path)
async def _run() -> None:
await server.start()
loop = asyncio.get_running_loop()
# SIGHUP → reload config (Unix only)
if hasattr(signal, "SIGHUP") and sys.platform != "win32":
loop.add_signal_handler(signal.SIGHUP, server.reload)
log.info("Send SIGHUP to reload configuration")
await server.serve_forever()
try:
asyncio.run(_run())
except KeyboardInterrupt:
log.info("Interrupted — shutting down")