diff --git a/nats/aio/client.py b/nats/aio/client.py index c8a9bfe1..81bb21bb 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -52,6 +52,7 @@ from .errors import ErrInvalidUserCredentials, ErrStaleConnection from .msg import Msg +from .stats import ClientStats, StatsInterface from .subscription import ( DEFAULT_SUB_PENDING_BYTES_LIMIT, DEFAULT_SUB_PENDING_MSGS_LIMIT, @@ -127,6 +128,7 @@ class Srv: server_version: Optional[str] = None + class ServerVersion: def __init__(self, server_version: str) -> None: @@ -230,6 +232,7 @@ class Client: """ msg_class: type[Msg] = Msg + stats: StatsInterface # FIXME: Use an enum instead. DISCONNECTED = 0 @@ -314,14 +317,7 @@ def __init__(self) -> None: self._public_nkey: Optional[str] = None self.options: Dict[str, Any] = {} - self.stats = { - "in_msgs": 0, - "out_msgs": 0, - "in_bytes": 0, - "out_bytes": 0, - "reconnects": 0, - "errors_received": 0, - } + self.stats = ClientStats() async def connect( self, @@ -947,8 +943,7 @@ async def _send_publish( hdr.extend(_CRLF_) pub_cmd = prot_command.hpub_cmd(subject, reply, hdr, payload) - self.stats["out_msgs"] += 1 - self.stats["out_bytes"] += payload_size + self.stats.message_sent(subject, payload_size, headers) await self._send_command(pub_cmd) if self._flush_queue is not None and self._flush_queue.empty(): await self._flush_pending() @@ -1510,7 +1505,7 @@ async def _attempt_reconnect(self) -> None: # Consider a reconnect to be done once CONNECT was # processed by the server successfully. - self.stats["reconnects"] += 1 + self.stats.client_reconnected() # Reset reconnect attempts for this server # since have successfully connected. @@ -1749,8 +1744,8 @@ async def _process_msg( Process MSG sent by server. """ payload_size = len(data) - self.stats["in_msgs"] += 1 - self.stats["in_bytes"] += payload_size + hdr = await self._process_headers(headers) + self.stats.message_received(subject.decode(), payload_size, hdr) sub = self._subs.get(sid) if not sub: @@ -1764,8 +1759,6 @@ async def _process_msg( # internal queue and the task will finish once the last # message is processed. self._subs.pop(sid, None) - - hdr = await self._process_headers(headers) msg = self._build_message(sid, subject, reply, data, hdr) if not msg: return diff --git a/nats/aio/stats.py b/nats/aio/stats.py new file mode 100644 index 00000000..813c32bc --- /dev/null +++ b/nats/aio/stats.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections import UserDict +from typing import Dict, Optional + + +class StatsInterface(ABC): + """ + Abstract base class defining the interface for NATS client statistics tracking. + """ + + @abstractmethod + def message_received( + self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None + ) -> None: + """Record an incoming message with its payload size.""" + pass + + @abstractmethod + def message_sent( + self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None + ) -> None: + """Record an outgoing message with its payload size.""" + pass + + @abstractmethod + def client_reconnected(self) -> None: + """Record a client reconnection.""" + pass + + @abstractmethod + def error_received(self) -> None: + """Record a server error.""" + pass + + +class ClientStats(StatsInterface, UserDict): + """ + ClientStats tracks NATS client connection statistics and acts as a dict + for backward compatibility while providing structured methods for updates. + """ + + def __init__(self) -> None: + super().__init__( + { + "in_msgs": 0, + "out_msgs": 0, + "in_bytes": 0, + "out_bytes": 0, + "reconnects": 0, + "errors_received": 0, + } + ) + + def message_received( + self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None + ) -> None: + """Record an incoming message with its payload size.""" + self.data["in_msgs"] += 1 + self.data["in_bytes"] += payload_size + + def message_sent( + self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None + ) -> None: + """Record an outgoing message with its payload size.""" + self.data["out_msgs"] += 1 + self.data["out_bytes"] += payload_size + + def client_reconnected(self) -> None: + """Record a client reconnection.""" + self.data["reconnects"] += 1 + + def error_received(self) -> None: + """Record a server error.""" + self.data["errors_received"] += 1 +