180 lines
5.9 KiB
Python
180 lines
5.9 KiB
Python
"""
|
|
Configuration loader for xonotic_exporter.
|
|
|
|
TOML format:
|
|
[exporter]
|
|
host = "0.0.0.0"
|
|
port = 9260
|
|
|
|
[[servers]]
|
|
name = "vehicles"
|
|
host = "localhost"
|
|
port = 26010
|
|
rcon_password = ""
|
|
rcon_mode = 2 # 0=nonsecure, 1=secure-time, 2=secure-challenge (MD4)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import tomllib
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
|
|
# ── RCON mode constants ────────────────────────────────────────────────────────
|
|
|
|
RCON_MODE_NONSECURE = 0
|
|
RCON_MODE_SECURE_TIME = 1
|
|
RCON_MODE_SECURE_CHALLENGE = 2 # MD4 HMAC challenge/response
|
|
|
|
RCON_MODE_NAMES = {
|
|
RCON_MODE_NONSECURE: "nonsecure",
|
|
RCON_MODE_SECURE_TIME: "secure-time",
|
|
RCON_MODE_SECURE_CHALLENGE: "secure-challenge (MD4)",
|
|
}
|
|
|
|
VALID_RCON_MODES = set(RCON_MODE_NAMES)
|
|
|
|
|
|
# ── Data classes ───────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class ServerConfig:
|
|
name: str
|
|
host: str
|
|
port: int
|
|
rcon_password: str
|
|
rcon_mode: int
|
|
|
|
def __post_init__(self) -> None:
|
|
if not self.name:
|
|
raise ConfigError("server 'name' must not be empty")
|
|
if not self.host:
|
|
raise ConfigError(f"[{self.name}] 'host' must not be empty")
|
|
if not (1 <= self.port <= 65535):
|
|
raise ConfigError(f"[{self.name}] 'port' must be 1-65535, got {self.port}")
|
|
if self.rcon_mode not in VALID_RCON_MODES:
|
|
raise ConfigError(
|
|
f"[{self.name}] 'rcon_mode' must be one of {sorted(VALID_RCON_MODES)}, "
|
|
f"got {self.rcon_mode}"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class ExporterConfig:
|
|
host: str = "0.0.0.0"
|
|
port: int = 9260
|
|
servers: list[ServerConfig] = field(default_factory=list)
|
|
|
|
# derived index for O(1) lookup by name
|
|
_index: dict[str, ServerConfig] = field(default_factory=dict, init=False, repr=False)
|
|
|
|
def __post_init__(self) -> None:
|
|
self._build_index()
|
|
|
|
def _build_index(self) -> None:
|
|
self._index = {s.name: s for s in self.servers}
|
|
|
|
def get_server(self, name: str) -> Optional[ServerConfig]:
|
|
return self._index.get(name)
|
|
|
|
def server_names(self) -> list[str]:
|
|
return list(self._index)
|
|
|
|
|
|
# ── Exceptions ─────────────────────────────────────────────────────────────────
|
|
|
|
class ConfigError(ValueError):
|
|
"""Raised for any configuration problem."""
|
|
|
|
|
|
# ── Loader ─────────────────────────────────────────────────────────────────────
|
|
|
|
_EXPORTER_DEFAULTS: dict = {
|
|
"host": "0.0.0.0",
|
|
"port": 9260,
|
|
}
|
|
|
|
_SERVER_DEFAULTS: dict = {
|
|
"port": 26000,
|
|
"rcon_mode": RCON_MODE_SECURE_CHALLENGE,
|
|
}
|
|
|
|
|
|
def load_config(path: str) -> ExporterConfig:
|
|
"""Parse and validate *path* (a TOML file). Returns :class:`ExporterConfig`."""
|
|
try:
|
|
with open(path, "rb") as fh:
|
|
raw = tomllib.load(fh)
|
|
except FileNotFoundError:
|
|
raise ConfigError(f"config file not found: {path}")
|
|
except tomllib.TOMLDecodeError as exc:
|
|
raise ConfigError(f"TOML parse error: {exc}") from exc
|
|
|
|
return _parse_raw(raw)
|
|
|
|
|
|
def load_config_from_string(text: str) -> ExporterConfig:
|
|
"""Parse TOML from *text* (used in tests / --validate)."""
|
|
try:
|
|
raw = tomllib.loads(text)
|
|
except tomllib.TOMLDecodeError as exc:
|
|
raise ConfigError(f"TOML parse error: {exc}") from exc
|
|
return _parse_raw(raw)
|
|
|
|
|
|
def _parse_raw(raw: dict) -> ExporterConfig:
|
|
exp_raw = raw.get("exporter", {})
|
|
exp_host = exp_raw.get("host", _EXPORTER_DEFAULTS["host"])
|
|
exp_port = exp_raw.get("port", _EXPORTER_DEFAULTS["port"])
|
|
|
|
if not isinstance(exp_port, int) or not (1 <= exp_port <= 65535):
|
|
raise ConfigError(f"[exporter] 'port' must be 1-65535, got {exp_port!r}")
|
|
|
|
servers_raw = raw.get("servers", [])
|
|
if not isinstance(servers_raw, list):
|
|
raise ConfigError("'servers' must be an array of tables ([[servers]])")
|
|
if not servers_raw:
|
|
raise ConfigError("at least one [[servers]] entry is required")
|
|
|
|
servers: list[ServerConfig] = []
|
|
seen_names: set[str] = set()
|
|
|
|
for idx, entry in enumerate(servers_raw, start=1):
|
|
if not isinstance(entry, dict):
|
|
raise ConfigError(f"servers[{idx}] must be a table")
|
|
|
|
name = entry.get("name", "")
|
|
if not isinstance(name, str) or not name:
|
|
raise ConfigError(f"servers[{idx}] missing or empty 'name'")
|
|
if name in seen_names:
|
|
raise ConfigError(f"duplicate server name: {name!r}")
|
|
seen_names.add(name)
|
|
|
|
host = entry.get("host", "")
|
|
if not isinstance(host, str):
|
|
raise ConfigError(f"[{name}] 'host' must be a string")
|
|
|
|
port = entry.get("port", _SERVER_DEFAULTS["port"])
|
|
if not isinstance(port, int):
|
|
raise ConfigError(f"[{name}] 'port' must be an integer")
|
|
|
|
rcon_password = entry.get("rcon_password", "")
|
|
if not isinstance(rcon_password, str):
|
|
raise ConfigError(f"[{name}] 'rcon_password' must be a string")
|
|
|
|
rcon_mode = entry.get("rcon_mode", _SERVER_DEFAULTS["rcon_mode"])
|
|
if not isinstance(rcon_mode, int):
|
|
raise ConfigError(f"[{name}] 'rcon_mode' must be an integer (0, 1, or 2)")
|
|
|
|
servers.append(ServerConfig(
|
|
name=name,
|
|
host=host,
|
|
port=port,
|
|
rcon_password=rcon_password,
|
|
rcon_mode=rcon_mode,
|
|
))
|
|
|
|
return ExporterConfig(host=exp_host, port=exp_port, servers=servers)
|