from __future__ import annotations import socket import threading from dataclasses import dataclass, field from typing import Callable, Optional @dataclass class ServerClient: sock: socket.socket addr: tuple id: int description: Optional[str] = None lock: threading.Lock = field(default_factory=threading.Lock) buffer: bytes = b"" in_game: bool = False def send_line(self, line: str) -> None: data = (line + "\n").encode("utf-8") with self.lock: try: self.sock.sendall(data) except OSError: pass def read_line(self) -> Optional[str]: while True: if b"\n" in self.buffer: line, self.buffer = self.buffer.split(b"\n", 1) return line.decode("utf-8", errors="replace").rstrip("\r") try: chunk = self.sock.recv(4096) except OSError: return None if not chunk: return None self.buffer += chunk def close(self) -> None: try: self.sock.close() except OSError: pass class ServerListener: def __init__(self, port: int) -> None: self.port = port self._next_id = 1 self._lock = threading.Lock() def _alloc_id(self) -> int: with self._lock: cid = self._next_id self._next_id += 1 return cid def accept_loop(self, on_new_client: Callable[[ServerClient], None]) -> None: srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.bind(("", self.port)) srv.listen() try: while True: conn, addr = srv.accept() client = ServerClient(sock=conn, addr=addr, id=self._alloc_id()) on_new_client(client) finally: srv.close()