Files
Xonotic-Exporter/xonotic_exporter/metrics_parser.py
T
DerGrumpf 76adff71d7 Init
2026-04-26 16:56:13 +00:00

298 lines
9.9 KiB
Python

"""
Parser for the combined output of:
sv_public
status 1
Both commands are sent in a single RCON packet separated by \\0.
The server sends them back as two separate UDP datagrams (or sometimes one).
We feed raw bytes in chunks until the state machine reaches DONE.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
class ParseError(ValueError):
"""Raised when the parser receives unexpected input."""
class _State(Enum):
SV_PUBLIC = auto()
HOSTNAME = auto()
VERSION = auto()
PROTOCOL = auto()
MAP = auto()
TIMING = auto()
PLAYERS_HEADER = auto()
STATUS_HEADER = auto()
PLAYER_ROW = auto()
DONE = auto()
@dataclass
class PlayerRow:
ip: str
port: str
ping: int
packetloss: int
time_seconds: int
frags: int
slot: str
name: str
is_bot: bool
is_spectator: bool
@dataclass
class XonoticMetrics:
"""All metrics scraped from one server query."""
# server identity
hostname: str = ""
map_name: str = ""
sv_public: int = 0
# timing
timing_cpu: float = 0.0
timing_lost: float = 0.0
timing_offset_avg: float = 0.0
timing_offset_max: float = 0.0
timing_offset_sdev: float = 0.0
# players
players: list = field(default_factory=list) # list[PlayerRow]
players_count: int = 0 # from "players: N active (M max)"
players_max: int = 0
players_active: int = 0 # scored players
players_spectators: int = 0
players_bots: int = 0
# network RTT (filled in by the protocol layer)
ping: float = 0.0
# ── compiled regexes ───────────────────────────────────────────────────────────
_COLORS_RE = re.compile(rb"\^(?:\d|x[\dA-Fa-f]{3})")
_SV_PUBLIC_RE = re.compile(rb'sv_public\S*\s+is\s+"(-?\d+)')
_HOST_RE = re.compile(rb"^host:\s+(.+)$")
_MAP_RE = re.compile(rb"^map:\s+(\S+)")
_TIMING_RE = re.compile(
rb"^timing:\s+"
rb"(?P<cpu>-?[\d.]+)%\s+CPU,\s+"
rb"(?P<lost>-?[\d.]+)%\s+lost,\s+"
rb"offset\s+avg\s+(?P<offset_avg>-?[\d.]+)\s*ms,\s+"
rb"max\s+(?P<max>-?[\d.]+)ms,\s+"
rb"sdev\s+(?P<sdev>-?[\d.]+)ms"
)
_PLAYERS_RE = re.compile(
rb"players:\s+(?P<count>\d+)\s+active\s+\((?P<max>\d+)\s+max\)"
)
_STATUS_HDR_RE = re.compile(rb"^\^?2?IP\s")
class XonoticMetricsParser:
"""
Feed raw UDP payload bytes via :meth:`feed` until :attr:`done` is True.
Access the result via :attr:`metrics`.
"""
def __init__(self) -> None:
self._buf: bytes = b""
self._state: _State = _State.SV_PUBLIC
self._expected_players: int = 0
self._seen_players: int = 0
self.metrics: XonoticMetrics = XonoticMetrics()
@property
def done(self) -> bool:
return self._state is _State.DONE
# ── public feed ────────────────────────────────────────────────────────────
def feed(self, data: bytes) -> None:
"""Append *data* and process all complete lines."""
if self._state is _State.DONE:
return
self._buf += data
while not self.done:
try:
line, self._buf = self._buf.split(b"\n", 1)
except ValueError:
break # wait for more data
self._process_line(line.rstrip(b"\r"))
# ── dispatcher ────────────────────────────────────────────────────────────
def _process_line(self, line: bytes) -> None:
handlers = {
_State.SV_PUBLIC: self._parse_sv_public,
_State.HOSTNAME: self._parse_hostname,
_State.VERSION: self._parse_version,
_State.PROTOCOL: self._parse_protocol,
_State.MAP: self._parse_map,
_State.TIMING: self._parse_timing,
_State.PLAYERS_HEADER: self._parse_players_header,
_State.STATUS_HEADER: self._parse_status_header,
_State.PLAYER_ROW: self._parse_player_row,
}
handler = handlers.get(self._state)
if handler:
handler(line)
# ── state handlers ────────────────────────────────────────────────────────
def _parse_sv_public(self, line: bytes) -> None:
m = _SV_PUBLIC_RE.search(_COLORS_RE.sub(b"", line))
if m:
self.metrics.sv_public = int(m.group(1))
self._state = _State.HOSTNAME
# ignore blank lines / stray output before sv_public
def _parse_hostname(self, line: bytes) -> None:
m = _HOST_RE.match(line)
if m:
raw = _COLORS_RE.sub(b"", m.group(1).strip())
self.metrics.hostname = raw.decode("utf-8", "replace")
self._state = _State.VERSION
def _parse_version(self, line: bytes) -> None:
if line.startswith(b"version:"):
self._state = _State.PROTOCOL
def _parse_protocol(self, line: bytes) -> None:
if line.startswith(b"protocol:"):
self._state = _State.MAP
def _parse_map(self, line: bytes) -> None:
m = _MAP_RE.match(line)
if m:
raw = _COLORS_RE.sub(b"", m.group(1))
self.metrics.map_name = raw.decode("utf-8", "replace")
self._state = _State.TIMING
def _parse_timing(self, line: bytes) -> None:
m = _TIMING_RE.match(line)
if m:
d = m.groupdict()
self.metrics.timing_cpu = float(d["cpu"])
self.metrics.timing_lost = float(d["lost"])
self.metrics.timing_offset_avg = float(d["offset_avg"])
self.metrics.timing_offset_max = float(d["max"])
self.metrics.timing_offset_sdev = float(d["sdev"])
self._state = _State.PLAYERS_HEADER
def _parse_players_header(self, line: bytes) -> None:
m = _PLAYERS_RE.search(_COLORS_RE.sub(b"", line))
if m:
self.metrics.players_count = int(m.group("count"))
self.metrics.players_max = int(m.group("max"))
self._state = _State.STATUS_HEADER
def _parse_status_header(self, line: bytes) -> None:
# "IP " or "^2IP " depending on Xonotic version
if _STATUS_HDR_RE.match(line):
if self.metrics.players_count > 0:
self._expected_players = self.metrics.players_count
self._seen_players = 0
self._state = _State.PLAYER_ROW
else:
self._state = _State.DONE
def _parse_player_row(self, line: bytes) -> None:
clean = _COLORS_RE.sub(b"", line)
fields = clean.split()
if len(fields) < 5:
return
self._seen_players += 1
ip_raw = fields[0].decode("utf-8", "replace")
# split ip:port
if ip_raw == "botclient":
ip, port = "botclient", "0"
is_bot = True
elif ":" in ip_raw:
if ip_raw.startswith("["):
# IPv6: [2a02:...]:port
bracket_end = ip_raw.rfind("]")
ip = ip_raw[1:bracket_end]
port = ip_raw[bracket_end+2:] if bracket_end+2 < len(ip_raw) else "0"
else:
# IPv4: 1.2.3.4:port
ip, _, port = ip_raw.rpartition(":")
is_bot = False
else:
ip, port = ip_raw, "0"
is_bot = False
# packetloss — fields[1]
try:
packetloss = int(fields[1])
except (ValueError, IndexError):
packetloss = 0
# ping — fields[2]
try:
ping = int(fields[2])
except (ValueError, IndexError):
ping = 0
# time — fields[3], format is H:MM:SS or M:SS
try:
time_raw = fields[3].decode("utf-8", "replace")
parts = time_raw.split(":")
if len(parts) == 3:
time_seconds = int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
elif len(parts) == 2:
time_seconds = int(parts[0]) * 60 + int(parts[1])
else:
time_seconds = 0
except (ValueError, IndexError):
time_seconds = 0
# frags — fields[4]
try:
frags = int(fields[4])
except (ValueError, IndexError):
frags = 0
is_spectator = (frags == -666) and not is_bot
slot = fields[5].decode("utf-8", "replace") if len(fields) > 5 else ""
name_parts = fields[6:] if len(fields) > 6 else []
name = b" ".join(name_parts).decode("utf-8", "replace")
if is_bot:
self.metrics.players_bots += 1
elif is_spectator:
self.metrics.players_spectators += 1
else:
self.metrics.players_active += 1
self.metrics.players.append(PlayerRow(
ip=ip,
port=port,
ping=ping,
packetloss=packetloss,
time_seconds=time_seconds,
frags=frags,
slot=slot,
name=name,
is_bot=is_bot,
is_spectator=is_spectator,
))
if self._seen_players >= self._expected_players:
self._state = _State.DONE
# ── helpers ───────────────────────────────────────────────────────────────
@staticmethod
def strip_colors(data: bytes) -> bytes:
return _COLORS_RE.sub(b"", data)