298 lines
9.9 KiB
Python
298 lines
9.9 KiB
Python
"""
|
|
Parser for the combined output of:
|
|
sv_public
|
|
status 1
|
|
|
|
Both commands are sent in a single RCON packet separated by \\0.
|
|
The server sends them back as two separate UDP datagrams (or sometimes one).
|
|
We feed raw bytes in chunks until the state machine reaches DONE.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum, auto
|
|
from typing import Optional
|
|
|
|
|
|
class ParseError(ValueError):
|
|
"""Raised when the parser receives unexpected input."""
|
|
|
|
|
|
class _State(Enum):
|
|
SV_PUBLIC = auto()
|
|
HOSTNAME = auto()
|
|
VERSION = auto()
|
|
PROTOCOL = auto()
|
|
MAP = auto()
|
|
TIMING = auto()
|
|
PLAYERS_HEADER = auto()
|
|
STATUS_HEADER = auto()
|
|
PLAYER_ROW = auto()
|
|
DONE = auto()
|
|
|
|
|
|
@dataclass
|
|
class PlayerRow:
|
|
ip: str
|
|
port: str
|
|
ping: int
|
|
packetloss: int
|
|
time_seconds: int
|
|
frags: int
|
|
slot: str
|
|
name: str
|
|
is_bot: bool
|
|
is_spectator: bool
|
|
|
|
|
|
@dataclass
|
|
class XonoticMetrics:
|
|
"""All metrics scraped from one server query."""
|
|
# server identity
|
|
hostname: str = ""
|
|
map_name: str = ""
|
|
sv_public: int = 0
|
|
|
|
# timing
|
|
timing_cpu: float = 0.0
|
|
timing_lost: float = 0.0
|
|
timing_offset_avg: float = 0.0
|
|
timing_offset_max: float = 0.0
|
|
timing_offset_sdev: float = 0.0
|
|
|
|
# players
|
|
players: list = field(default_factory=list) # list[PlayerRow]
|
|
players_count: int = 0 # from "players: N active (M max)"
|
|
players_max: int = 0
|
|
players_active: int = 0 # scored players
|
|
players_spectators: int = 0
|
|
players_bots: int = 0
|
|
|
|
# network RTT (filled in by the protocol layer)
|
|
ping: float = 0.0
|
|
|
|
|
|
# ── compiled regexes ───────────────────────────────────────────────────────────
|
|
|
|
_COLORS_RE = re.compile(rb"\^(?:\d|x[\dA-Fa-f]{3})")
|
|
_SV_PUBLIC_RE = re.compile(rb'sv_public\S*\s+is\s+"(-?\d+)')
|
|
_HOST_RE = re.compile(rb"^host:\s+(.+)$")
|
|
_MAP_RE = re.compile(rb"^map:\s+(\S+)")
|
|
_TIMING_RE = re.compile(
|
|
rb"^timing:\s+"
|
|
rb"(?P<cpu>-?[\d.]+)%\s+CPU,\s+"
|
|
rb"(?P<lost>-?[\d.]+)%\s+lost,\s+"
|
|
rb"offset\s+avg\s+(?P<offset_avg>-?[\d.]+)\s*ms,\s+"
|
|
rb"max\s+(?P<max>-?[\d.]+)ms,\s+"
|
|
rb"sdev\s+(?P<sdev>-?[\d.]+)ms"
|
|
)
|
|
_PLAYERS_RE = re.compile(
|
|
rb"players:\s+(?P<count>\d+)\s+active\s+\((?P<max>\d+)\s+max\)"
|
|
)
|
|
_STATUS_HDR_RE = re.compile(rb"^\^?2?IP\s")
|
|
|
|
|
|
class XonoticMetricsParser:
|
|
"""
|
|
Feed raw UDP payload bytes via :meth:`feed` until :attr:`done` is True.
|
|
Access the result via :attr:`metrics`.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._buf: bytes = b""
|
|
self._state: _State = _State.SV_PUBLIC
|
|
self._expected_players: int = 0
|
|
self._seen_players: int = 0
|
|
self.metrics: XonoticMetrics = XonoticMetrics()
|
|
|
|
@property
|
|
def done(self) -> bool:
|
|
return self._state is _State.DONE
|
|
|
|
# ── public feed ────────────────────────────────────────────────────────────
|
|
|
|
def feed(self, data: bytes) -> None:
|
|
"""Append *data* and process all complete lines."""
|
|
if self._state is _State.DONE:
|
|
return
|
|
|
|
self._buf += data
|
|
while not self.done:
|
|
try:
|
|
line, self._buf = self._buf.split(b"\n", 1)
|
|
except ValueError:
|
|
break # wait for more data
|
|
self._process_line(line.rstrip(b"\r"))
|
|
|
|
# ── dispatcher ────────────────────────────────────────────────────────────
|
|
|
|
def _process_line(self, line: bytes) -> None:
|
|
handlers = {
|
|
_State.SV_PUBLIC: self._parse_sv_public,
|
|
_State.HOSTNAME: self._parse_hostname,
|
|
_State.VERSION: self._parse_version,
|
|
_State.PROTOCOL: self._parse_protocol,
|
|
_State.MAP: self._parse_map,
|
|
_State.TIMING: self._parse_timing,
|
|
_State.PLAYERS_HEADER: self._parse_players_header,
|
|
_State.STATUS_HEADER: self._parse_status_header,
|
|
_State.PLAYER_ROW: self._parse_player_row,
|
|
}
|
|
handler = handlers.get(self._state)
|
|
if handler:
|
|
handler(line)
|
|
|
|
# ── state handlers ────────────────────────────────────────────────────────
|
|
|
|
def _parse_sv_public(self, line: bytes) -> None:
|
|
m = _SV_PUBLIC_RE.search(_COLORS_RE.sub(b"", line))
|
|
if m:
|
|
self.metrics.sv_public = int(m.group(1))
|
|
self._state = _State.HOSTNAME
|
|
# ignore blank lines / stray output before sv_public
|
|
|
|
def _parse_hostname(self, line: bytes) -> None:
|
|
m = _HOST_RE.match(line)
|
|
if m:
|
|
raw = _COLORS_RE.sub(b"", m.group(1).strip())
|
|
self.metrics.hostname = raw.decode("utf-8", "replace")
|
|
self._state = _State.VERSION
|
|
|
|
def _parse_version(self, line: bytes) -> None:
|
|
if line.startswith(b"version:"):
|
|
self._state = _State.PROTOCOL
|
|
|
|
def _parse_protocol(self, line: bytes) -> None:
|
|
if line.startswith(b"protocol:"):
|
|
self._state = _State.MAP
|
|
|
|
def _parse_map(self, line: bytes) -> None:
|
|
m = _MAP_RE.match(line)
|
|
if m:
|
|
raw = _COLORS_RE.sub(b"", m.group(1))
|
|
self.metrics.map_name = raw.decode("utf-8", "replace")
|
|
self._state = _State.TIMING
|
|
|
|
def _parse_timing(self, line: bytes) -> None:
|
|
m = _TIMING_RE.match(line)
|
|
if m:
|
|
d = m.groupdict()
|
|
self.metrics.timing_cpu = float(d["cpu"])
|
|
self.metrics.timing_lost = float(d["lost"])
|
|
self.metrics.timing_offset_avg = float(d["offset_avg"])
|
|
self.metrics.timing_offset_max = float(d["max"])
|
|
self.metrics.timing_offset_sdev = float(d["sdev"])
|
|
self._state = _State.PLAYERS_HEADER
|
|
|
|
def _parse_players_header(self, line: bytes) -> None:
|
|
m = _PLAYERS_RE.search(_COLORS_RE.sub(b"", line))
|
|
if m:
|
|
self.metrics.players_count = int(m.group("count"))
|
|
self.metrics.players_max = int(m.group("max"))
|
|
self._state = _State.STATUS_HEADER
|
|
|
|
def _parse_status_header(self, line: bytes) -> None:
|
|
# "IP " or "^2IP " depending on Xonotic version
|
|
if _STATUS_HDR_RE.match(line):
|
|
if self.metrics.players_count > 0:
|
|
self._expected_players = self.metrics.players_count
|
|
self._seen_players = 0
|
|
self._state = _State.PLAYER_ROW
|
|
else:
|
|
self._state = _State.DONE
|
|
|
|
def _parse_player_row(self, line: bytes) -> None:
|
|
clean = _COLORS_RE.sub(b"", line)
|
|
fields = clean.split()
|
|
if len(fields) < 5:
|
|
return
|
|
|
|
self._seen_players += 1
|
|
ip_raw = fields[0].decode("utf-8", "replace")
|
|
|
|
# split ip:port
|
|
if ip_raw == "botclient":
|
|
ip, port = "botclient", "0"
|
|
is_bot = True
|
|
elif ":" in ip_raw:
|
|
if ip_raw.startswith("["):
|
|
# IPv6: [2a02:...]:port
|
|
bracket_end = ip_raw.rfind("]")
|
|
ip = ip_raw[1:bracket_end]
|
|
port = ip_raw[bracket_end+2:] if bracket_end+2 < len(ip_raw) else "0"
|
|
else:
|
|
# IPv4: 1.2.3.4:port
|
|
ip, _, port = ip_raw.rpartition(":")
|
|
is_bot = False
|
|
else:
|
|
ip, port = ip_raw, "0"
|
|
is_bot = False
|
|
|
|
# packetloss — fields[1]
|
|
try:
|
|
packetloss = int(fields[1])
|
|
except (ValueError, IndexError):
|
|
packetloss = 0
|
|
|
|
# ping — fields[2]
|
|
try:
|
|
ping = int(fields[2])
|
|
except (ValueError, IndexError):
|
|
ping = 0
|
|
|
|
# time — fields[3], format is H:MM:SS or M:SS
|
|
try:
|
|
time_raw = fields[3].decode("utf-8", "replace")
|
|
parts = time_raw.split(":")
|
|
if len(parts) == 3:
|
|
time_seconds = int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
|
|
elif len(parts) == 2:
|
|
time_seconds = int(parts[0]) * 60 + int(parts[1])
|
|
else:
|
|
time_seconds = 0
|
|
except (ValueError, IndexError):
|
|
time_seconds = 0
|
|
|
|
# frags — fields[4]
|
|
try:
|
|
frags = int(fields[4])
|
|
except (ValueError, IndexError):
|
|
frags = 0
|
|
|
|
is_spectator = (frags == -666) and not is_bot
|
|
|
|
slot = fields[5].decode("utf-8", "replace") if len(fields) > 5 else ""
|
|
name_parts = fields[6:] if len(fields) > 6 else []
|
|
name = b" ".join(name_parts).decode("utf-8", "replace")
|
|
|
|
if is_bot:
|
|
self.metrics.players_bots += 1
|
|
elif is_spectator:
|
|
self.metrics.players_spectators += 1
|
|
else:
|
|
self.metrics.players_active += 1
|
|
|
|
self.metrics.players.append(PlayerRow(
|
|
ip=ip,
|
|
port=port,
|
|
ping=ping,
|
|
packetloss=packetloss,
|
|
time_seconds=time_seconds,
|
|
frags=frags,
|
|
slot=slot,
|
|
name=name,
|
|
is_bot=is_bot,
|
|
is_spectator=is_spectator,
|
|
))
|
|
|
|
if self._seen_players >= self._expected_players:
|
|
self._state = _State.DONE
|
|
|
|
# ── helpers ───────────────────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
def strip_colors(data: bytes) -> bytes:
|
|
return _COLORS_RE.sub(b"", data)
|