""" Async UDP protocol for querying Xonotic servers via RCON. Supports all three RCON modes defined in config.py: 0 – nonsecure (plain password in packet) 1 – secure-time (HMAC-MD4 with timestamp) 2 – secure-challenge (HMAC-MD4 with server challenge — recommended) """ from __future__ import annotations import asyncio import logging import time from typing import Optional from xrcon import utils as xutils from .config import ( RCON_MODE_NONSECURE, RCON_MODE_SECURE_TIME, RCON_MODE_SECURE_CHALLENGE, ServerConfig, ) from .metrics_parser import ParseError, XonoticMetrics, XonoticMetricsParser log = logging.getLogger(__name__) # Quake-style ping _PING_PACKET = b"\xFF\xFF\xFF\xFFping" _PONG_PACKET = b"\xFF\xFF\xFF\xFFack" class RconError(OSError): """Raised when all retries for an RCON command are exhausted.""" # ── Protocol implementation ──────────────────────────────────────────────────── class _XonoticProtocol(asyncio.DatagramProtocol): """ Low-level asyncio UDP protocol. One instance per connection (create_datagram_endpoint). """ def __init__(self, password: str, rcon_mode: int, timeout: float = 5.0) -> None: self.password = password self.rcon_mode = rcon_mode self.timeout = timeout self.transport: Optional[asyncio.DatagramTransport] = None self._remote_addr: Optional[tuple] = None # Futures set by datagram_received self._ping_fut: Optional[asyncio.Future] = None self._challenge_fut: Optional[asyncio.Future] = None # Queue for incoming RCON response chunks self._rcon_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=200) # Locks so concurrent callers serialise self._ping_lock = asyncio.Lock() self._challenge_lock = asyncio.Lock() # ── asyncio.DatagramProtocol callbacks ──────────────────────────────────── def connection_made(self, transport: asyncio.DatagramTransport) -> None: # type: ignore[override] self.transport = transport self._remote_addr = transport.get_extra_info("peername") log.debug("UDP endpoint ready → %s", self._remote_addr) def datagram_received(self, data: bytes, addr: tuple) -> None: if addr != self._remote_addr: return # ignore stray packets if data == _PONG_PACKET: fut = self._ping_fut if fut and not fut.done(): fut.set_result(time.monotonic()) elif data.startswith(xutils.CHALLENGE_RESPONSE_HEADER): fut = self._challenge_fut if fut and not fut.done(): fut.set_result(xutils.parse_challenge_response(data)) elif data.startswith(xutils.RCON_RESPONSE_HEADER): chunk = xutils.parse_rcon_response(data) try: self._rcon_queue.put_nowait(chunk) except asyncio.QueueFull: log.warning("RCON receive queue full — dropping packet from %s", addr) def error_received(self, exc: Exception) -> None: log.debug("UDP error from %s: %s", self._remote_addr, exc) def connection_lost(self, exc: Optional[Exception]) -> None: log.debug("UDP connection closed (%s)", self._remote_addr) # ── Higher-level helpers ────────────────────────────────────────────────── async def ping(self) -> float: """Return round-trip time in seconds.""" async with self._ping_lock: self._ping_fut = asyncio.get_running_loop().create_future() try: t0 = time.monotonic() self.transport.sendto(_PING_PACKET) await asyncio.wait_for(asyncio.shield(self._ping_fut), self.timeout) return time.monotonic() - t0 finally: self._ping_fut = None async def _getchallenge(self) -> bytes: """Request and return the server's challenge string.""" async with self._challenge_lock: self._challenge_fut = asyncio.get_running_loop().create_future() try: self.transport.sendto(xutils.CHALLENGE_PACKET) return await asyncio.wait_for( asyncio.shield(self._challenge_fut), self.timeout ) finally: self._challenge_fut = None async def rcon(self, command: str) -> None: """Send *command* via RCON using the configured mode.""" if self.rcon_mode == RCON_MODE_NONSECURE: pkt = xutils.rcon_nosecure_packet(self.password, command) self.transport.sendto(pkt) elif self.rcon_mode == RCON_MODE_SECURE_TIME: pkt = xutils.rcon_secure_time_packet(self.password, command) self.transport.sendto(pkt) elif self.rcon_mode == RCON_MODE_SECURE_CHALLENGE: challenge = await self._getchallenge() pkt = xutils.rcon_secure_challenge_packet(self.password, challenge, command) self.transport.sendto(pkt) else: raise ValueError(f"Unknown rcon_mode: {self.rcon_mode}") async def read_rcon_response(self) -> XonoticMetrics: """ Uses an adaptive wait: first chunk establishes a baseline RTT, subsequent waits are scaled to allow for multi-packet responses. """ parser = XonoticMetricsParser() # first chunk — use the full timeout t0 = time.monotonic() chunk = await asyncio.wait_for(self._rcon_queue.get(), self.timeout) rtt = time.monotonic() - t0 parser.feed(chunk) while not parser.done: wait = max(rtt * 2.0, 0.3) try: t0 = time.monotonic() chunk = await asyncio.wait_for(self._rcon_queue.get(), wait) rtt = rtt * 0.8 + (time.monotonic() - t0) * 0.2 parser.feed(chunk) except asyncio.TimeoutError: # No more data — if the parser is not done the server may # have sent a truncated response; accept what we have. log.debug("Timed out waiting for more RCON data (partial parse)") break return parser.metrics # ── Public scrape function ──────────────────────────────────────────────────── async def scrape_server( server: ServerConfig, retries: int = 3, timeout: float = 5.0, ) -> XonoticMetrics: """ Open a UDP endpoint to *server*, send the RCON query, parse and return :class:`~metrics_parser.XonoticMetrics`. Retries up to *retries* times on transient failures. Always closes the transport before returning. """ last_exc: Exception = RconError("no attempts made") for attempt in range(1, retries + 1): transport: Optional[asyncio.DatagramTransport] = None try: loop = asyncio.get_running_loop() proto_factory = lambda: _XonoticProtocol( # noqa: E731 password=server.rcon_password, rcon_mode=server.rcon_mode, timeout=timeout, ) transport, proto = await loop.create_datagram_endpoint( proto_factory, remote_addr=(server.host, server.port), ) # fire ping and metrics query concurrently ping_task = asyncio.create_task(proto.ping()) # send combined command: sv_public + status 1 await proto.rcon("sv_public\x00status 1") metrics = await proto.read_rcon_response() # collect ping (it may finish before or after metrics) try: metrics.ping = await asyncio.wait_for(ping_task, timeout) except asyncio.TimeoutError: metrics.ping = 0.0 log.debug("[%s] ping timed out on attempt %d", server.name, attempt) log.debug("[%s] scrape OK on attempt %d", server.name, attempt) return metrics except (OSError, asyncio.TimeoutError, ParseError) as exc: last_exc = exc log.warning( "[%s] scrape attempt %d/%d failed: %s", server.name, attempt, retries, exc, ) finally: if transport is not None: transport.close() raise RconError( f"[{server.name}] all {retries} scrape attempts failed" ) from last_exc async def scrape_match( server: ServerConfig, retries: int = 3, timeout: float = 5.0, ) -> dict: """ Scrape match metadata: timelimit, fraglimit, teamplay. Returns a plain dict with keys: timelimit, fraglimit, teamplay. """ import re _CVAR_RE = re.compile(rb'(\w+) is "(\d+)') last_exc: Exception = RconError("no attempts made") for attempt in range(1, retries + 1): transport = None try: loop = asyncio.get_running_loop() proto_factory = lambda: _XonoticProtocol( password=server.rcon_password, rcon_mode=server.rcon_mode, timeout=timeout, ) transport, proto = await loop.create_datagram_endpoint( proto_factory, remote_addr=(server.host, server.port), ) await proto.rcon("timelimit\x00fraglimit\x00teamplay") # collect response chunks result = {} deadline = asyncio.get_running_loop().time() + timeout while asyncio.get_running_loop().time() < deadline: try: chunk = await asyncio.wait_for( proto._rcon_queue.get(), 1.0 ) clean = re.sub(rb'\^\d', b'', chunk) for m in _CVAR_RE.finditer(clean): key = m.group(1).decode("utf-8", "replace") val = int(m.group(2)) result[key] = val if len(result) >= 3: break except asyncio.TimeoutError: break return result except (OSError, asyncio.TimeoutError) as exc: last_exc = exc log.warning("[%s] match scrape attempt %d/%d failed: %s", server.name, attempt, retries, exc) finally: if transport is not None: transport.close() raise RconError(f"[{server.name}] match scrape failed") from last_exc