Skip to content

Commit 73765ad

Browse files
committed
feat(nats/client): make stats extensible
Replace the stats dictionary by a class and interface, that is extensible to allow users to replace the stats collection by their own custom stats collector. Resolves #720 Signed-off-by: Tim Drijvers <[email protected]>
1 parent 8f53949 commit 73765ad

File tree

2 files changed

+88
-15
lines changed

2 files changed

+88
-15
lines changed

nats/aio/client.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252

5353
from .errors import ErrInvalidUserCredentials, ErrStaleConnection
5454
from .msg import Msg
55+
from .stats import ClientStats, StatsInterface
5556
from .subscription import (
5657
DEFAULT_SUB_PENDING_BYTES_LIMIT,
5758
DEFAULT_SUB_PENDING_MSGS_LIMIT,
@@ -127,6 +128,7 @@ class Srv:
127128
server_version: Optional[str] = None
128129

129130

131+
130132
class ServerVersion:
131133

132134
def __init__(self, server_version: str) -> None:
@@ -230,6 +232,7 @@ class Client:
230232
"""
231233

232234
msg_class: type[Msg] = Msg
235+
stats: StatsInterface
233236

234237
# FIXME: Use an enum instead.
235238
DISCONNECTED = 0
@@ -314,14 +317,7 @@ def __init__(self) -> None:
314317
self._public_nkey: Optional[str] = None
315318

316319
self.options: Dict[str, Any] = {}
317-
self.stats = {
318-
"in_msgs": 0,
319-
"out_msgs": 0,
320-
"in_bytes": 0,
321-
"out_bytes": 0,
322-
"reconnects": 0,
323-
"errors_received": 0,
324-
}
320+
self.stats = ClientStats()
325321

326322
async def connect(
327323
self,
@@ -947,8 +943,7 @@ async def _send_publish(
947943
hdr.extend(_CRLF_)
948944
pub_cmd = prot_command.hpub_cmd(subject, reply, hdr, payload)
949945

950-
self.stats["out_msgs"] += 1
951-
self.stats["out_bytes"] += payload_size
946+
self.stats.message_sent(subject, payload_size, headers)
952947
await self._send_command(pub_cmd)
953948
if self._flush_queue is not None and self._flush_queue.empty():
954949
await self._flush_pending()
@@ -1510,7 +1505,7 @@ async def _attempt_reconnect(self) -> None:
15101505

15111506
# Consider a reconnect to be done once CONNECT was
15121507
# processed by the server successfully.
1513-
self.stats["reconnects"] += 1
1508+
self.stats.client_reconnected()
15141509

15151510
# Reset reconnect attempts for this server
15161511
# since have successfully connected.
@@ -1749,8 +1744,8 @@ async def _process_msg(
17491744
Process MSG sent by server.
17501745
"""
17511746
payload_size = len(data)
1752-
self.stats["in_msgs"] += 1
1753-
self.stats["in_bytes"] += payload_size
1747+
hdr = await self._process_headers(headers)
1748+
self.stats.message_received(subject.decode(), payload_size, hdr)
17541749

17551750
sub = self._subs.get(sid)
17561751
if not sub:
@@ -1764,8 +1759,6 @@ async def _process_msg(
17641759
# internal queue and the task will finish once the last
17651760
# message is processed.
17661761
self._subs.pop(sid, None)
1767-
1768-
hdr = await self._process_headers(headers)
17691762
msg = self._build_message(sid, subject, reply, data, hdr)
17701763
if not msg:
17711764
return

nats/aio/stats.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Copyright 2016-2023 The NATS Authors
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
#
14+
15+
from __future__ import annotations
16+
17+
from abc import ABC, abstractmethod
18+
from collections import UserDict
19+
from typing import Dict, Optional
20+
21+
22+
class StatsInterface(ABC):
23+
"""
24+
Abstract base class defining the interface for NATS client statistics tracking.
25+
"""
26+
27+
@abstractmethod
28+
def message_received(self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None) -> None:
29+
"""Record an incoming message with its payload size."""
30+
pass
31+
32+
@abstractmethod
33+
def message_sent(self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None) -> None:
34+
"""Record an outgoing message with its payload size."""
35+
pass
36+
37+
@abstractmethod
38+
def client_reconnected(self) -> None:
39+
"""Record a client reconnection."""
40+
pass
41+
42+
@abstractmethod
43+
def error_received(self) -> None:
44+
"""Record a server error."""
45+
pass
46+
47+
48+
class ClientStats(StatsInterface, UserDict):
49+
"""
50+
ClientStats tracks NATS client connection statistics and acts as a dict
51+
for backward compatibility while providing structured methods for updates.
52+
"""
53+
54+
def __init__(self) -> None:
55+
super().__init__({
56+
"in_msgs": 0,
57+
"out_msgs": 0,
58+
"in_bytes": 0,
59+
"out_bytes": 0,
60+
"reconnects": 0,
61+
"errors_received": 0,
62+
})
63+
64+
def message_received(self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None) -> None:
65+
"""Record an incoming message with its payload size."""
66+
self.data["in_msgs"] += 1
67+
self.data["in_bytes"] += payload_size
68+
69+
def message_sent(self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None) -> None:
70+
"""Record an outgoing message with its payload size."""
71+
self.data["out_msgs"] += 1
72+
self.data["out_bytes"] += payload_size
73+
74+
def client_reconnected(self) -> None:
75+
"""Record a client reconnection."""
76+
self.data["reconnects"] += 1
77+
78+
def error_received(self) -> None:
79+
"""Record a server error."""
80+
self.data["errors_received"] += 1

0 commit comments

Comments
 (0)