Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

from .errors import ErrInvalidUserCredentials, ErrStaleConnection
from .msg import Msg
from .stats import ClientStats, StatsInterface
from .subscription import (
DEFAULT_SUB_PENDING_BYTES_LIMIT,
DEFAULT_SUB_PENDING_MSGS_LIMIT,
Expand Down Expand Up @@ -127,6 +128,7 @@ class Srv:
server_version: Optional[str] = None



class ServerVersion:

def __init__(self, server_version: str) -> None:
Expand Down Expand Up @@ -230,6 +232,7 @@ class Client:
"""

msg_class: type[Msg] = Msg
stats: StatsInterface

# FIXME: Use an enum instead.
DISCONNECTED = 0
Expand Down Expand Up @@ -314,14 +317,7 @@ def __init__(self) -> None:
self._public_nkey: Optional[str] = None

self.options: Dict[str, Any] = {}
self.stats = {
"in_msgs": 0,
"out_msgs": 0,
"in_bytes": 0,
"out_bytes": 0,
"reconnects": 0,
"errors_received": 0,
}
self.stats = ClientStats()

async def connect(
self,
Expand Down Expand Up @@ -947,8 +943,7 @@ async def _send_publish(
hdr.extend(_CRLF_)
pub_cmd = prot_command.hpub_cmd(subject, reply, hdr, payload)

self.stats["out_msgs"] += 1
self.stats["out_bytes"] += payload_size
self.stats.message_sent(subject, payload_size, headers)
await self._send_command(pub_cmd)
if self._flush_queue is not None and self._flush_queue.empty():
await self._flush_pending()
Expand Down Expand Up @@ -1510,7 +1505,7 @@ async def _attempt_reconnect(self) -> None:

# Consider a reconnect to be done once CONNECT was
# processed by the server successfully.
self.stats["reconnects"] += 1
self.stats.client_reconnected()

# Reset reconnect attempts for this server
# since have successfully connected.
Expand Down Expand Up @@ -1749,8 +1744,8 @@ async def _process_msg(
Process MSG sent by server.
"""
payload_size = len(data)
self.stats["in_msgs"] += 1
self.stats["in_bytes"] += payload_size
hdr = await self._process_headers(headers)
self.stats.message_received(subject.decode(), payload_size, hdr)

sub = self._subs.get(sid)
if not sub:
Expand All @@ -1764,8 +1759,6 @@ async def _process_msg(
# internal queue and the task will finish once the last
# message is processed.
self._subs.pop(sid, None)

hdr = await self._process_headers(headers)
msg = self._build_message(sid, subject, reply, data, hdr)
if not msg:
return
Expand Down
77 changes: 77 additions & 0 deletions nats/aio/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from collections import UserDict
from typing import Dict, Optional


class StatsInterface(ABC):
"""
Abstract base class defining the interface for NATS client statistics tracking.
"""

@abstractmethod
def message_received(
self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None
) -> None:
"""Record an incoming message with its payload size."""
pass

@abstractmethod
def message_sent(
self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None
) -> None:
"""Record an outgoing message with its payload size."""
pass

@abstractmethod
def client_reconnected(self) -> None:
"""Record a client reconnection."""
pass

@abstractmethod
def error_received(self) -> None:
"""Record a server error."""
pass


class ClientStats(StatsInterface, UserDict):
"""
ClientStats tracks NATS client connection statistics and acts as a dict
for backward compatibility while providing structured methods for updates.
"""

def __init__(self) -> None:
super().__init__(
{
"in_msgs": 0,
"out_msgs": 0,
"in_bytes": 0,
"out_bytes": 0,
"reconnects": 0,
"errors_received": 0,
}
)

def message_received(
self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None
) -> None:
"""Record an incoming message with its payload size."""
self.data["in_msgs"] += 1
self.data["in_bytes"] += payload_size

def message_sent(
self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None
) -> None:
"""Record an outgoing message with its payload size."""
self.data["out_msgs"] += 1
self.data["out_bytes"] += payload_size

def client_reconnected(self) -> None:
"""Record a client reconnection."""
self.data["reconnects"] += 1

def error_received(self) -> None:
"""Record a server error."""
self.data["errors_received"] += 1

Loading